* Fix log message during hierarchy merging so it's not as confusing * add reason for making a snapshot. This can be used to separate out later messages into item data snapshot and details snapshot during debugging --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No #### Type of change - [ ] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [x] 🧹 Tech Debt/Cleanup #### Test Plan - [x] 💪 Manual - [ ] ⚡ Unit test - [ ] 💚 E2E
316 lines
7.3 KiB
Go
316 lines
7.3 KiB
Go
// streamstore implements helpers to store large
|
|
// data streams in a repository
|
|
package streamstore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
|
|
"github.com/alcionai/clues"
|
|
|
|
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
|
|
"github.com/alcionai/corso/src/internal/data"
|
|
"github.com/alcionai/corso/src/internal/kopia"
|
|
"github.com/alcionai/corso/src/internal/kopia/inject"
|
|
"github.com/alcionai/corso/src/internal/stats"
|
|
"github.com/alcionai/corso/src/pkg/fault"
|
|
"github.com/alcionai/corso/src/pkg/path"
|
|
)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// controller
|
|
// ---------------------------------------------------------------------------
|
|
|
|
var _ Streamer = &storeStreamer{}
|
|
|
|
type storeStreamer struct {
|
|
kw *kopia.Wrapper
|
|
tenant string
|
|
service path.ServiceType
|
|
dbcs []data.BackupCollection
|
|
}
|
|
|
|
// NewStreamer creates a new streamstore Streamer for stream writing metadata files
|
|
// to the store.
|
|
func NewStreamer(
|
|
kw *kopia.Wrapper,
|
|
tenant string,
|
|
service path.ServiceType,
|
|
) *storeStreamer {
|
|
return &storeStreamer{
|
|
kw: kw,
|
|
tenant: tenant,
|
|
service: service,
|
|
}
|
|
}
|
|
|
|
// Collect eagerly searializes the marshalable bytes in the collectable into a
|
|
// data.BackupCollection. The collection is stored within the storeStreamer
|
|
// for persistence when Write is called.
|
|
func (ss *storeStreamer) Collect(ctx context.Context, col Collectable) error {
|
|
cs, err := collect(ctx, ss.tenant, ss.service, col)
|
|
if err != nil {
|
|
return clues.Wrap(err, "collecting data for stream store")
|
|
}
|
|
|
|
ss.dbcs = append(ss.dbcs, cs)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Write persists the collected objects in the stream store
|
|
func (ss *storeStreamer) Write(ctx context.Context, errs *fault.Bus) (string, error) {
|
|
ctx = clues.Add(ctx, "snapshot_type", "stream store")
|
|
|
|
id, err := write(ctx, ss.kw, ss.dbcs, errs)
|
|
if err != nil {
|
|
return "", clues.Wrap(err, "writing to stream store")
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// Read reads a collector object from the kopia repository
|
|
func (ss *storeStreamer) Read(ctx context.Context, snapshotID string, col Collectable, errs *fault.Bus) error {
|
|
err := read(ctx, snapshotID, ss.tenant, ss.service, col, ss.kw, errs)
|
|
if err != nil {
|
|
return clues.Wrap(err, "reading from stream store")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// interfaces
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// Streamer is the core interface for all types of data streamed to and
|
|
// from the store.
|
|
type Streamer interface {
|
|
Collector
|
|
Writer
|
|
Reader
|
|
}
|
|
|
|
type CollectorWriter interface {
|
|
Collector
|
|
Writer
|
|
}
|
|
|
|
type Collector interface {
|
|
Collect(context.Context, Collectable) error
|
|
}
|
|
|
|
type Reader interface {
|
|
Read(context.Context, string, Collectable, *fault.Bus) error
|
|
}
|
|
|
|
type Writer interface {
|
|
Write(context.Context, *fault.Bus) (string, error)
|
|
}
|
|
|
|
// Marshallers are used to convert structs into bytes to be persisted in the store.
|
|
type Marshaller interface {
|
|
Marshal() ([]byte, error)
|
|
}
|
|
|
|
// Unmarshallers are used to serialize the bytes in the store into the original struct.
|
|
type Unmarshaller func(io.ReadCloser) error
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// collection
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// streamCollection is a data.BackupCollection used to persist
|
|
// a single data stream
|
|
type streamCollection struct {
|
|
// folderPath indicates what level in the hierarchy this collection
|
|
// represents
|
|
folderPath path.Path
|
|
item *streamItem
|
|
}
|
|
|
|
func (dc *streamCollection) FullPath() path.Path {
|
|
return dc.folderPath
|
|
}
|
|
|
|
func (dc *streamCollection) PreviousPath() path.Path {
|
|
return nil
|
|
}
|
|
|
|
func (dc *streamCollection) State() data.CollectionState {
|
|
return data.NewState
|
|
}
|
|
|
|
func (dc *streamCollection) DoNotMergeItems() bool {
|
|
return false
|
|
}
|
|
|
|
// Items() always returns a channel with a single data.Item
|
|
// representing the object to be persisted
|
|
func (dc *streamCollection) Items(context.Context, *fault.Bus) <-chan data.Item {
|
|
items := make(chan data.Item, 1)
|
|
defer close(items)
|
|
items <- dc.item
|
|
|
|
return items
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// item
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type streamItem struct {
|
|
name string
|
|
data []byte
|
|
}
|
|
|
|
func (di *streamItem) ID() string {
|
|
return di.name
|
|
}
|
|
|
|
func (di *streamItem) ToReader() io.ReadCloser {
|
|
return io.NopCloser(bytes.NewReader(di.data))
|
|
}
|
|
|
|
func (di *streamItem) Deleted() bool {
|
|
return false
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// common reader/writer/deleter
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// collect aggregates a collection of bytes
|
|
func collect(
|
|
ctx context.Context,
|
|
tenantID string,
|
|
service path.ServiceType,
|
|
col Collectable,
|
|
) (data.BackupCollection, error) {
|
|
// construct the path of the container
|
|
p, err := path.Builder{}.ToStreamStorePath(tenantID, col.purpose, service, false)
|
|
if err != nil {
|
|
return nil, clues.Stack(err).WithClues(ctx)
|
|
}
|
|
|
|
// TODO: We could use an io.Pipe here to avoid a double copy but that
|
|
// makes error handling a bit complicated
|
|
bs, err := col.mr.Marshal()
|
|
if err != nil {
|
|
return nil, clues.Wrap(err, "marshalling body").WithClues(ctx)
|
|
}
|
|
|
|
dc := streamCollection{
|
|
folderPath: p,
|
|
item: &streamItem{
|
|
name: col.itemName,
|
|
data: bs,
|
|
},
|
|
}
|
|
|
|
return &dc, nil
|
|
}
|
|
|
|
// write persists bytes to the store
|
|
func write(
|
|
ctx context.Context,
|
|
bup inject.BackupConsumer,
|
|
dbcs []data.BackupCollection,
|
|
errs *fault.Bus,
|
|
) (string, error) {
|
|
ctx = clues.Add(ctx, "collection_source", "streamstore")
|
|
|
|
backupStats, _, _, err := bup.ConsumeBackupCollections(
|
|
ctx,
|
|
nil,
|
|
nil,
|
|
dbcs,
|
|
prefixmatcher.NopReader[map[string]struct{}](),
|
|
nil,
|
|
false,
|
|
errs)
|
|
if err != nil {
|
|
return "", clues.Wrap(err, "storing marshalled bytes in repository")
|
|
}
|
|
|
|
return backupStats.SnapshotID, nil
|
|
}
|
|
|
|
// read retrieves an object from the store
|
|
func read(
|
|
ctx context.Context,
|
|
snapshotID string,
|
|
tenantID string,
|
|
service path.ServiceType,
|
|
col Collectable,
|
|
rer inject.RestoreProducer,
|
|
errs *fault.Bus,
|
|
) error {
|
|
// construct the path of the container
|
|
p, err := path.Builder{}.
|
|
Append(col.itemName).
|
|
ToStreamStorePath(tenantID, col.purpose, service, true)
|
|
if err != nil {
|
|
return clues.Stack(err).WithClues(ctx)
|
|
}
|
|
|
|
pd, err := p.Dir()
|
|
if err != nil {
|
|
return clues.Stack(err).WithClues(ctx)
|
|
}
|
|
|
|
ctx = clues.Add(ctx, "snapshot_id", snapshotID)
|
|
|
|
cs, err := rer.ProduceRestoreCollections(
|
|
ctx,
|
|
snapshotID,
|
|
[]path.RestorePaths{
|
|
{
|
|
StoragePath: p,
|
|
RestorePath: pd,
|
|
},
|
|
},
|
|
&stats.ByteCounter{},
|
|
errs)
|
|
if err != nil {
|
|
return clues.Wrap(err, "retrieving data")
|
|
}
|
|
|
|
// Expect only 1 data collection
|
|
if len(cs) != 1 {
|
|
return clues.New("unexpected collection count").
|
|
WithClues(ctx).
|
|
With("collection_count", len(cs))
|
|
}
|
|
|
|
var (
|
|
c = cs[0]
|
|
found = false
|
|
items = c.Items(ctx, errs)
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return clues.New("context cancelled waiting for data").WithClues(ctx)
|
|
|
|
case itemData, ok := <-items:
|
|
if !ok {
|
|
if !found {
|
|
return clues.New("no data found").WithClues(ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if err := col.Unmr(itemData.ToReader()); err != nil {
|
|
return clues.Wrap(err, "unmarshalling data").WithClues(ctx)
|
|
}
|
|
|
|
found = true
|
|
}
|
|
}
|
|
}
|