refactor streamstore for generic use (#2723)

Refactors the stream store so that it can comply with multiple different writers and readers beyond just the details.Details struct.  This will allow us to use the stream store to ingest large lists of errors fault items without burdening the backup manifest.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #2708

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-03-08 10:16:09 -07:00 committed by GitHub
parent b2c8c3eaa1
commit 0b2d0067ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 290 additions and 119 deletions

View File

@ -97,10 +97,6 @@ type backupStats struct {
resourceCount int
}
type detailsWriter interface {
WriteBackupDetails(context.Context, *details.Details, *fault.Bus) (string, error)
}
// ---------------------------------------------------------------------------
// Primary Controller
// ---------------------------------------------------------------------------
@ -220,7 +216,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
func (op *BackupOperation) do(
ctx context.Context,
opStats *backupStats,
detailsStore detailsReader,
detailsStore streamstore.Streamer,
backupID model.StableID,
) (*details.Builder, error) {
reasons := selectorToReasons(op.Selectors)
@ -498,7 +494,7 @@ func matchesReason(reasons []kopia.Reason, p path.Path) bool {
func mergeDetails(
ctx context.Context,
ms *store.Wrapper,
detailsStore detailsReader,
detailsStore streamstore.Streamer,
mans []*kopia.ManifestEntry,
shortRefsFromPrevBackup map[string]kopia.PrevRefs,
deets *details.Builder,
@ -654,7 +650,7 @@ func (op *BackupOperation) persistResults(
// stores the operation details, results, and selectors in the backup manifest.
func (op *BackupOperation) createBackupModels(
ctx context.Context,
detailsStore detailsWriter,
detailsStore streamstore.Writer,
snapID string,
backupID model.StableID,
backupDetails *details.Details,
@ -665,7 +661,7 @@ func (op *BackupOperation) createBackupModels(
return clues.New("no backup details to record").WithClues(ctx)
}
detailsID, err := detailsStore.WriteBackupDetails(ctx, backupDetails, op.Errors)
detailsID, err := detailsStore.Write(ctx, backupDetails, op.Errors)
if err != nil {
return clues.Wrap(err, "creating backupDetails model").WithClues(ctx)
}

View File

@ -18,6 +18,7 @@ import (
evmock "github.com/alcionai/corso/src/internal/events/mock"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model"
ssmock "github.com/alcionai/corso/src/internal/streamstore/mock"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup"
@ -108,26 +109,6 @@ func (mbu mockBackuper) BackupCollections(
return &kopia.BackupStats{}, &details.Builder{}, nil, nil
}
// ----- details
type mockDetailsReader struct {
entries map[string]*details.Details
}
func (mdr mockDetailsReader) ReadBackupDetails(
ctx context.Context,
detailsID string,
errs *fault.Bus,
) (*details.Details, error) {
r := mdr.entries[detailsID]
if r == nil {
return nil, errors.Errorf("no details for ID %s", detailsID)
}
return r, nil
}
// ----- model store for backups
type mockBackupStorer struct {
@ -326,8 +307,7 @@ func makeDetailsEntry(
assert.FailNowf(
t,
"service %s not supported in helper function",
p.Service().String(),
)
p.Service().String())
}
return res
@ -1221,14 +1201,14 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems
ctx, flush := tester.NewContext()
defer flush()
mdr := mockDetailsReader{entries: test.populatedDetails}
mds := ssmock.DetailsStreamer{Entries: test.populatedDetails}
w := &store.Wrapper{Storer: mockBackupStorer{entries: test.populatedModels}}
deets := details.Builder{}
err := mergeDetails(
ctx,
w,
mdr,
mds,
test.inputMans,
test.inputShortRefsFromPrevBackup,
&deets,
@ -1303,50 +1283,107 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsFolde
backup1.ID: backup1,
}
itemSize = 42
itemDetails = makeDetailsEntry(t, itemPath1, itemPath1, itemSize, false)
populatedDetails = map[string]*details.Details{
backup1.DetailsID: {
DetailsModel: details.DetailsModel{
Entries: []details.DetailsEntry{
*itemDetails,
},
},
},
}
expectedEntries = []details.DetailsEntry{
*itemDetails,
}
itemSize = 42
now = time.Now()
// later = now.Add(42 * time.Minute)
)
itemDetails.Exchange.Modified = time.Now()
itemDetails := makeDetailsEntry(t, itemPath1, itemPath1, itemSize, false)
// itemDetails.Exchange.Modified = now
populatedDetails := map[string]*details.Details{
backup1.DetailsID: {
DetailsModel: details.DetailsModel{
Entries: []details.DetailsEntry{*itemDetails},
},
},
}
expectedEntries := []details.DetailsEntry{*itemDetails}
// update the details
itemDetails.Exchange.Modified = now
for i := 1; i < len(pathElems); i++ {
expectedEntries = append(expectedEntries, *makeFolderEntry(
t,
path.Builder{}.Append(pathElems[:i]...),
int64(itemSize),
itemDetails.Exchange.Modified,
))
itemDetails.Exchange.Modified))
}
ctx, flush := tester.NewContext()
defer flush()
mdr := mockDetailsReader{entries: populatedDetails}
w := &store.Wrapper{Storer: mockBackupStorer{entries: populatedModels}}
deets := details.Builder{}
var (
mds = ssmock.DetailsStreamer{Entries: populatedDetails}
w = &store.Wrapper{Storer: mockBackupStorer{entries: populatedModels}}
deets = details.Builder{}
)
err := mergeDetails(
ctx,
w,
mdr,
mds,
inputMans,
inputToMerge,
&deets,
fault.New(true))
assert.NoError(t, err)
assert.ElementsMatch(t, expectedEntries, deets.Details().Entries)
compareDeetEntries(t, expectedEntries, deets.Details().Entries)
}
// compares two details slices. Useful for tests where serializing the
// entries can produce minor variations in the time struct, causing
// assert.elementsMatch to fail.
func compareDeetEntries(t *testing.T, expect, result []details.DetailsEntry) {
if !assert.Equal(t, len(expect), len(result), "entry slices should be equal len") {
require.ElementsMatch(t, expect, result)
}
var (
// repoRef -> modified time
eMods = map[string]time.Time{}
es = make([]details.DetailsEntry, 0, len(expect))
rs = make([]details.DetailsEntry, 0, len(expect))
)
for _, e := range expect {
eMods[e.RepoRef] = e.Modified()
es = append(es, withoutModified(e))
}
for _, r := range result {
// this comparison is an artifact of bad comparisons across time.Time
// serialization using assert.ElementsMatch. The time struct can produce
// differences in its `ext` value across serialization while the actual time
// reference remains the same. assert handles this poorly, whereas the time
// library provides successful comparison.
assert.Truef(
t,
eMods[r.RepoRef].Equal(r.Modified()),
"expected modified time %v, got %v", eMods[r.RepoRef], r.Modified())
rs = append(rs, withoutModified(r))
}
assert.ElementsMatch(t, es, rs)
}
func withoutModified(de details.DetailsEntry) details.DetailsEntry {
switch {
case de.Exchange != nil:
de.Exchange.Modified = time.Time{}
case de.OneDrive != nil:
de.OneDrive.Modified = time.Time{}
case de.SharePoint != nil:
de.SharePoint.Modified = time.Time{}
case de.Folder != nil:
de.Folder.Modified = time.Time{}
}
return de
}

View File

@ -6,21 +6,18 @@ import (
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/streamstore"
"github.com/alcionai/corso/src/pkg/backup"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/store"
)
type detailsReader interface {
ReadBackupDetails(ctx context.Context, detailsID string, errs *fault.Bus) (*details.Details, error)
}
func getBackupAndDetailsFromID(
ctx context.Context,
backupID model.StableID,
ms *store.Wrapper,
detailsStore detailsReader,
detailsStore streamstore.Reader,
errs *fault.Bus,
) (*backup.Backup, *details.Details, error) {
dID, bup, err := ms.GetDetailsIDFromBackupID(ctx, backupID)
@ -28,10 +25,14 @@ func getBackupAndDetailsFromID(
return nil, nil, errors.Wrap(err, "getting backup details ID")
}
deets, err := detailsStore.ReadBackupDetails(ctx, dID, errs)
if err != nil {
var (
deets details.Details
umt = details.UnmarshalTo(&deets)
)
if err := detailsStore.Read(ctx, dID, umt, errs); err != nil {
return nil, nil, errors.Wrap(err, "getting backup details data")
}
return bup, deets, nil
return bup, &deets, nil
}

View File

@ -181,7 +181,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
func (op *RestoreOperation) do(
ctx context.Context,
opStats *restoreStats,
detailsStore detailsReader,
detailsStore streamstore.Reader,
start time.Time,
) (*details.Details, error) {
bup, deets, err := getBackupAndDetailsFromID(

View File

@ -2,24 +2,26 @@ package streamstore
import (
"context"
"encoding/json"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/pkg/errors"
)
var _ Streamer = &streamDetails{}
type streamDetails struct {
kw *kopia.Wrapper
tenant string
service path.ServiceType
}
// NewDetails creates a new storeStreamer for streaming
// details.Details structs.
func NewDetails(
kw *kopia.Wrapper,
tenant string,
@ -28,13 +30,17 @@ func NewDetails(
return &streamDetails{kw: kw, tenant: tenant, service: service}
}
// WriteBackupDetails persists a `details.Details`
// object in the stream store
func (ss *streamDetails) WriteBackupDetails(
ctx context.Context,
backupDetails *details.Details,
errs *fault.Bus,
) (string, error) {
const (
// detailsItemName is the name of the stream used to store
// backup details
detailsItemName = "details"
// collectionPurposeDetails is used to indicate
// what the collection is being used for
collectionPurposeDetails = "details"
)
// Write persists a `details.Details` object in the stream store
func (ss *streamDetails) Write(ctx context.Context, deets Marshaller, errs *fault.Bus) (string, error) {
// construct the path of the container for the `details` item
p, err := path.Builder{}.
ToStreamStorePath(
@ -48,7 +54,7 @@ func (ss *streamDetails) WriteBackupDetails(
// TODO: We could use an io.Pipe here to avoid a double copy but that
// makes error handling a bit complicated
dbytes, err := json.Marshal(backupDetails)
dbytes, err := deets.Marshal()
if err != nil {
return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx)
}
@ -76,13 +82,13 @@ func (ss *streamDetails) WriteBackupDetails(
return backupStats.SnapshotID, nil
}
// ReadBackupDetails reads the specified details object
// from the kopia repository
func (ss *streamDetails) ReadBackupDetails(
// Read reads a `details.Details` object from the kopia repository
func (ss *streamDetails) Read(
ctx context.Context,
detailsID string,
umr Unmarshaller,
errs *fault.Bus,
) (*details.Details, error) {
) error {
// construct the path for the `details` item
detailsPath, err := path.Builder{}.
Append(detailsItemName).
@ -93,47 +99,48 @@ func (ss *streamDetails) ReadBackupDetails(
true,
)
if err != nil {
return nil, clues.Stack(err).WithClues(ctx)
return clues.Stack(err).WithClues(ctx)
}
var bc stats.ByteCounter
dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs)
dcs, err := ss.kw.RestoreMultipleItems(
ctx,
detailsID,
[]path.Path{detailsPath},
&stats.ByteCounter{},
errs)
if err != nil {
return nil, errors.Wrap(err, "retrieving backup details data")
return errors.Wrap(err, "retrieving backup details data")
}
// Expect only 1 data collection
if len(dcs) != 1 {
return nil, clues.New("greater than 1 details data collection found").
return clues.New("greater than 1 details collection found").
WithClues(ctx).
With("collection_count", len(dcs))
}
dc := dcs[0]
var d details.Details
found := false
items := dc.Items(ctx, errs)
var (
dc = dcs[0]
found = false
items = dc.Items(ctx, errs)
)
for {
select {
case <-ctx.Done():
return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx)
return clues.New("context cancelled waiting for backup details data").WithClues(ctx)
case itemData, ok := <-items:
if !ok {
if !found {
return nil, clues.New("no backup details found").WithClues(ctx)
return clues.New("no backup details found").WithClues(ctx)
}
return &d, nil
return nil
}
err := json.NewDecoder(itemData.ToReader()).Decode(&d)
if err != nil {
return nil, clues.Wrap(err, "decoding details data").WithClues(ctx)
if err := umr(itemData.ToReader()); err != nil {
return clues.Wrap(err, "unmarshalling details data").WithClues(ctx)
}
found = true
@ -141,11 +148,8 @@ func (ss *streamDetails) ReadBackupDetails(
}
}
// DeleteBackupDetails deletes the specified details object from the kopia repository
func (ss *streamDetails) DeleteBackupDetails(
ctx context.Context,
detailsID string,
) error {
// Delete deletes a `details.Details` object from the kopia repository
func (ss *streamDetails) Delete(ctx context.Context, detailsID string) error {
err := ss.kw.DeleteSnapshot(ctx, detailsID)
if err != nil {
return errors.Wrap(err, "deleting backup details")

View File

@ -54,16 +54,19 @@ func (suite *StreamDetailsIntegrationSuite) TestDetails() {
},
})
deets := deetsBuilder.Details()
nd := NewDetails(kw, "tenant", path.ExchangeService)
var (
deets = deetsBuilder.Details()
sd = NewDetails(kw, "tenant", path.ExchangeService)
)
id, err := nd.WriteBackupDetails(ctx, deets, fault.New(true))
id, err := sd.Write(ctx, deets, fault.New(true))
require.NoError(t, err)
require.NotNil(t, id)
readDeets, err := nd.ReadBackupDetails(ctx, id, fault.New(true))
var readDeets details.Details
err = sd.Read(ctx, id, details.UnmarshalTo(&readDeets), fault.New(true))
require.NoError(t, err)
require.NotNil(t, readDeets)
require.NotEmpty(t, readDeets)
assert.Equal(t, len(deets.Entries), len(readDeets.Entries))
assert.Equal(t, deets.Entries[0].ParentRef, readDeets.Entries[0].ParentRef)

View File

@ -0,0 +1,48 @@
package mock
import (
"bytes"
"context"
"io"
"github.com/pkg/errors"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/streamstore"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
)
var _ streamstore.Streamer = &DetailsStreamer{}
type DetailsStreamer struct {
Entries map[string]*details.Details
}
func (ds DetailsStreamer) Read(
ctx context.Context,
detailsID string,
umr streamstore.Unmarshaller,
errs *fault.Bus,
) error {
r := ds.Entries[detailsID]
if r == nil {
return errors.Errorf("no details for ID %s", detailsID)
}
bs, err := r.Marshal()
if err != nil {
return err
}
return umr(io.NopCloser(bytes.NewReader(bs)))
}
func (ds DetailsStreamer) Write(context.Context, streamstore.Marshaller, *fault.Bus) (string, error) {
return "", clues.New("not implmented")
}
func (ds DetailsStreamer) Delete(context.Context, string) error {
return clues.New("not implmented")
}

View File

@ -12,14 +12,37 @@ import (
"github.com/alcionai/corso/src/pkg/path"
)
const (
// detailsItemName is the name of the stream used to store
// backup details
detailsItemName = "details"
// collectionPurposeDetails is used to indicate
// what the collection is being used for
collectionPurposeDetails = "details"
)
// ---------------------------------------------------------------------------
// interfaces
// ---------------------------------------------------------------------------
// Streamer is the core interface for all types of data streamed to and
// from the store.
type Streamer interface {
Writer
Reader
Delete(context.Context, string) error
}
type Reader interface {
Read(context.Context, string, Unmarshaller, *fault.Bus) error
}
type Writer interface {
Write(context.Context, Marshaller, *fault.Bus) (string, error)
}
// Marshallers are used to convert structs into bytes to be persisted in the store.
type Marshaller interface {
Marshal() ([]byte, error)
}
// Unmarshallers are used to serialize the bytes in the store into the original struct.
type Unmarshaller func(io.ReadCloser) error
// ---------------------------------------------------------------------------
// collection
// ---------------------------------------------------------------------------
// streamCollection is a data.BackupCollection used to persist
// a single data stream
@ -56,6 +79,10 @@ func (dc *streamCollection) Items(context.Context, *fault.Bus) <-chan data.Strea
return items
}
// ---------------------------------------------------------------------------
// item
// ---------------------------------------------------------------------------
type streamItem struct {
name string
data []byte

View File

@ -2,6 +2,8 @@ package details
import (
"context"
"encoding/json"
"io"
"strconv"
"strings"
"sync"
@ -305,6 +307,18 @@ func (d *Details) addFolder(folder folderEntry) {
})
}
// Marshal complies with the marshaller interface in streamStore.
func (d *Details) Marshal() ([]byte, error) {
return json.Marshal(d)
}
// UnmarshalTo produces a func that complies with the unmarshaller type in streamStore.
func UnmarshalTo(d *Details) func(io.ReadCloser) error {
return func(rc io.ReadCloser) error {
return json.NewDecoder(rc).Decode(d)
}
}
// --------------------------------------------------------------------------------
// Entry
// --------------------------------------------------------------------------------

View File

@ -1,6 +1,8 @@
package details
import (
"bytes"
"io"
"testing"
"time"
@ -997,3 +999,40 @@ func (suite *DetailsUnitSuite) TestFolderEntriesForPath() {
})
}
}
func (suite *DetailsUnitSuite) TestDetails_Marshal() {
for _, test := range pathItemsTable {
suite.Run(test.name, func() {
d := &Details{DetailsModel: DetailsModel{
Entries: test.ents,
}}
bs, err := d.Marshal()
require.NoError(suite.T(), err)
assert.NotEmpty(suite.T(), bs)
})
}
}
func (suite *DetailsUnitSuite) TestUnarshalTo() {
for _, test := range pathItemsTable {
suite.Run(test.name, func() {
orig := &Details{DetailsModel: DetailsModel{
Entries: test.ents,
}}
bs, err := orig.Marshal()
require.NoError(suite.T(), err)
assert.NotEmpty(suite.T(), bs)
var result Details
umt := UnmarshalTo(&result)
err = umt(io.NopCloser(bytes.NewReader(bs)))
t := suite.T()
require.NoError(t, err)
require.NotNil(t, result)
assert.ElementsMatch(t, orig.Entries, result.Entries)
})
}
}

View File

@ -351,12 +351,14 @@ func (r repository) BackupDetails(
return nil, nil, errs.Fail(err)
}
deets, err := streamstore.NewDetails(
nd := streamstore.NewDetails(
r.dataLayer,
r.Account.ID(),
b.Selector.PathService(),
).ReadBackupDetails(ctx, dID, errs)
if err != nil {
)
var deets details.Details
if err := nd.Read(ctx, dID, details.UnmarshalTo(&deets), errs); err != nil {
return nil, nil, errs.Fail(err)
}
@ -371,7 +373,7 @@ func (r repository) BackupDetails(
}
}
return deets, b, errs
return &deets, b, errs
}
// DeleteBackup removes the backup from both the model store and the backup storage.