migrate streamstore details to own file (#2725)
Initial code movement before getting into larger refactor. The goal is to refactor the stream store so that it can comply with multiple different writers and readers beyond just the details.Details struct. This will allow us to use the stream store to ingest large lists of errors fault items without burdening the backup manifest. --- #### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change - [x] 🧹 Tech Debt/Cleanup #### Issue(s) * #2708 #### Test Plan - [x] ⚡ Unit test
This commit is contained in:
parent
1e56f5156a
commit
bb61697a58
@ -127,7 +127,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
var (
|
||||
opStats backupStats
|
||||
startTime = time.Now()
|
||||
detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService())
|
||||
detailsStore = streamstore.NewDetails(op.kopia, op.account.ID(), op.Selectors.PathService())
|
||||
)
|
||||
|
||||
op.Results.BackupID = model.StableID(uuid.NewString())
|
||||
|
||||
@ -120,7 +120,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
||||
restoreID: uuid.NewString(),
|
||||
}
|
||||
start = time.Now()
|
||||
detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService())
|
||||
detailsStore = streamstore.NewDetails(op.kopia, op.account.ID(), op.Selectors.PathService())
|
||||
)
|
||||
|
||||
// -----
|
||||
|
||||
155
src/internal/streamstore/details.go
Normal file
155
src/internal/streamstore/details.go
Normal file
@ -0,0 +1,155 @@
|
||||
package streamstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/stats"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type streamDetails struct {
|
||||
kw *kopia.Wrapper
|
||||
tenant string
|
||||
service path.ServiceType
|
||||
}
|
||||
|
||||
func NewDetails(
|
||||
kw *kopia.Wrapper,
|
||||
tenant string,
|
||||
service path.ServiceType,
|
||||
) *streamDetails {
|
||||
return &streamDetails{kw: kw, tenant: tenant, service: service}
|
||||
}
|
||||
|
||||
// WriteBackupDetails persists a `details.Details`
|
||||
// object in the stream store
|
||||
func (ss *streamDetails) WriteBackupDetails(
|
||||
ctx context.Context,
|
||||
backupDetails *details.Details,
|
||||
errs *fault.Bus,
|
||||
) (string, error) {
|
||||
// construct the path of the container for the `details` item
|
||||
p, err := path.Builder{}.
|
||||
ToStreamStorePath(
|
||||
ss.tenant,
|
||||
collectionPurposeDetails,
|
||||
ss.service,
|
||||
false)
|
||||
if err != nil {
|
||||
return "", clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
// TODO: We could use an io.Pipe here to avoid a double copy but that
|
||||
// makes error handling a bit complicated
|
||||
dbytes, err := json.Marshal(backupDetails)
|
||||
if err != nil {
|
||||
return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx)
|
||||
}
|
||||
|
||||
dc := &streamCollection{
|
||||
folderPath: p,
|
||||
item: &streamItem{
|
||||
name: detailsItemName,
|
||||
data: dbytes,
|
||||
},
|
||||
}
|
||||
|
||||
backupStats, _, _, err := ss.kw.BackupCollections(
|
||||
ctx,
|
||||
nil,
|
||||
[]data.BackupCollection{dc},
|
||||
nil,
|
||||
nil,
|
||||
false,
|
||||
errs)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "storing details in repository")
|
||||
}
|
||||
|
||||
return backupStats.SnapshotID, nil
|
||||
}
|
||||
|
||||
// ReadBackupDetails reads the specified details object
|
||||
// from the kopia repository
|
||||
func (ss *streamDetails) ReadBackupDetails(
|
||||
ctx context.Context,
|
||||
detailsID string,
|
||||
errs *fault.Bus,
|
||||
) (*details.Details, error) {
|
||||
// construct the path for the `details` item
|
||||
detailsPath, err := path.Builder{}.
|
||||
Append(detailsItemName).
|
||||
ToStreamStorePath(
|
||||
ss.tenant,
|
||||
collectionPurposeDetails,
|
||||
ss.service,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
var bc stats.ByteCounter
|
||||
|
||||
dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "retrieving backup details data")
|
||||
}
|
||||
|
||||
// Expect only 1 data collection
|
||||
if len(dcs) != 1 {
|
||||
return nil, clues.New("greater than 1 details data collection found").
|
||||
WithClues(ctx).
|
||||
With("collection_count", len(dcs))
|
||||
}
|
||||
|
||||
dc := dcs[0]
|
||||
|
||||
var d details.Details
|
||||
|
||||
found := false
|
||||
items := dc.Items(ctx, errs)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx)
|
||||
|
||||
case itemData, ok := <-items:
|
||||
if !ok {
|
||||
if !found {
|
||||
return nil, clues.New("no backup details found").WithClues(ctx)
|
||||
}
|
||||
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
err := json.NewDecoder(itemData.ToReader()).Decode(&d)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "decoding details data").WithClues(ctx)
|
||||
}
|
||||
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteBackupDetails deletes the specified details object from the kopia repository
|
||||
func (ss *streamDetails) DeleteBackupDetails(
|
||||
ctx context.Context,
|
||||
detailsID string,
|
||||
) error {
|
||||
err := ss.kw.DeleteSnapshot(ctx, detailsID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "deleting backup details")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -14,19 +14,19 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
type StreamStoreIntegrationSuite struct {
|
||||
type StreamDetailsIntegrationSuite struct {
|
||||
tester.Suite
|
||||
}
|
||||
|
||||
func TestStreamStoreIntegrationSuite(t *testing.T) {
|
||||
suite.Run(t, &StreamStoreIntegrationSuite{
|
||||
func TestStreamDetailsIntegrationSuite(t *testing.T) {
|
||||
suite.Run(t, &StreamDetailsIntegrationSuite{
|
||||
Suite: tester.NewIntegrationSuite(
|
||||
t,
|
||||
[][]string{tester.AWSStorageCredEnvs}),
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *StreamStoreIntegrationSuite) TestDetails() {
|
||||
func (suite *StreamDetailsIntegrationSuite) TestDetails() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext()
|
||||
@ -55,13 +55,13 @@ func (suite *StreamStoreIntegrationSuite) TestDetails() {
|
||||
})
|
||||
|
||||
deets := deetsBuilder.Details()
|
||||
ss := New(kw, "tenant", path.ExchangeService)
|
||||
nd := NewDetails(kw, "tenant", path.ExchangeService)
|
||||
|
||||
id, err := ss.WriteBackupDetails(ctx, deets, fault.New(true))
|
||||
id, err := nd.WriteBackupDetails(ctx, deets, fault.New(true))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, id)
|
||||
|
||||
readDeets, err := ss.ReadBackupDetails(ctx, id, fault.New(true))
|
||||
readDeets, err := nd.ReadBackupDetails(ctx, id, fault.New(true))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, readDeets)
|
||||
|
||||
@ -5,34 +5,13 @@ package streamstore
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/stats"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
type streamStore struct {
|
||||
kw *kopia.Wrapper
|
||||
tenant string
|
||||
service path.ServiceType
|
||||
}
|
||||
|
||||
func New(
|
||||
kw *kopia.Wrapper,
|
||||
tenant string,
|
||||
service path.ServiceType,
|
||||
) *streamStore {
|
||||
return &streamStore{kw: kw, tenant: tenant, service: service}
|
||||
}
|
||||
|
||||
const (
|
||||
// detailsItemName is the name of the stream used to store
|
||||
// backup details
|
||||
@ -42,132 +21,6 @@ const (
|
||||
collectionPurposeDetails = "details"
|
||||
)
|
||||
|
||||
// WriteBackupDetails persists a `details.Details`
|
||||
// object in the stream store
|
||||
func (ss *streamStore) WriteBackupDetails(
|
||||
ctx context.Context,
|
||||
backupDetails *details.Details,
|
||||
errs *fault.Bus,
|
||||
) (string, error) {
|
||||
// construct the path of the container for the `details` item
|
||||
p, err := path.Builder{}.
|
||||
ToStreamStorePath(
|
||||
ss.tenant,
|
||||
collectionPurposeDetails,
|
||||
ss.service,
|
||||
false)
|
||||
if err != nil {
|
||||
return "", clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
// TODO: We could use an io.Pipe here to avoid a double copy but that
|
||||
// makes error handling a bit complicated
|
||||
dbytes, err := json.Marshal(backupDetails)
|
||||
if err != nil {
|
||||
return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx)
|
||||
}
|
||||
|
||||
dc := &streamCollection{
|
||||
folderPath: p,
|
||||
item: &streamItem{
|
||||
name: detailsItemName,
|
||||
data: dbytes,
|
||||
},
|
||||
}
|
||||
|
||||
backupStats, _, _, err := ss.kw.BackupCollections(
|
||||
ctx,
|
||||
nil,
|
||||
[]data.BackupCollection{dc},
|
||||
nil,
|
||||
nil,
|
||||
false,
|
||||
errs)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "storing details in repository")
|
||||
}
|
||||
|
||||
return backupStats.SnapshotID, nil
|
||||
}
|
||||
|
||||
// ReadBackupDetails reads the specified details object
|
||||
// from the kopia repository
|
||||
func (ss *streamStore) ReadBackupDetails(
|
||||
ctx context.Context,
|
||||
detailsID string,
|
||||
errs *fault.Bus,
|
||||
) (*details.Details, error) {
|
||||
// construct the path for the `details` item
|
||||
detailsPath, err := path.Builder{}.
|
||||
Append(detailsItemName).
|
||||
ToStreamStorePath(
|
||||
ss.tenant,
|
||||
collectionPurposeDetails,
|
||||
ss.service,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
var bc stats.ByteCounter
|
||||
|
||||
dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "retrieving backup details data")
|
||||
}
|
||||
|
||||
// Expect only 1 data collection
|
||||
if len(dcs) != 1 {
|
||||
return nil, clues.New("greater than 1 details data collection found").
|
||||
WithClues(ctx).
|
||||
With("collection_count", len(dcs))
|
||||
}
|
||||
|
||||
dc := dcs[0]
|
||||
|
||||
var d details.Details
|
||||
|
||||
found := false
|
||||
items := dc.Items(ctx, errs)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx)
|
||||
|
||||
case itemData, ok := <-items:
|
||||
if !ok {
|
||||
if !found {
|
||||
return nil, clues.New("no backup details found").WithClues(ctx)
|
||||
}
|
||||
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
err := json.NewDecoder(itemData.ToReader()).Decode(&d)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "decoding details data").WithClues(ctx)
|
||||
}
|
||||
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteBackupDetails deletes the specified details object from the kopia repository
|
||||
func (ss *streamStore) DeleteBackupDetails(
|
||||
ctx context.Context,
|
||||
detailsID string,
|
||||
) error {
|
||||
err := ss.kw.DeleteSnapshot(ctx, detailsID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "deleting backup details")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// streamCollection is a data.BackupCollection used to persist
|
||||
// a single data stream
|
||||
type streamCollection struct {
|
||||
|
||||
@ -351,7 +351,7 @@ func (r repository) BackupDetails(
|
||||
return nil, nil, errs.Fail(err)
|
||||
}
|
||||
|
||||
deets, err := streamstore.New(
|
||||
deets, err := streamstore.NewDetails(
|
||||
r.dataLayer,
|
||||
r.Account.ID(),
|
||||
b.Selector.PathService(),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user