From 0b2d0067ee7d1676e62579d363f7264a052b5034 Mon Sep 17 00:00:00 2001 From: Keepers Date: Wed, 8 Mar 2023 10:16:09 -0700 Subject: [PATCH] refactor streamstore for generic use (#2723) Refactors the stream store so that it can comply with multiple different writers and readers beyond just the details.Details struct. This will allow us to use the stream store to ingest large lists of errors fault items without burdening the backup manifest. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :broom: Tech Debt/Cleanup #### Issue(s) * #2708 #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/operations/backup.go | 12 +- src/internal/operations/backup_test.go | 133 +++++++++++++++-------- src/internal/operations/common.go | 17 +-- src/internal/operations/restore.go | 2 +- src/internal/streamstore/details.go | 78 ++++++------- src/internal/streamstore/details_test.go | 13 ++- src/internal/streamstore/mock/details.go | 48 ++++++++ src/internal/streamstore/streamstore.go | 43 ++++++-- src/pkg/backup/details/details.go | 14 +++ src/pkg/backup/details/details_test.go | 39 +++++++ src/pkg/repository/repository.go | 10 +- 11 files changed, 290 insertions(+), 119 deletions(-) create mode 100644 src/internal/streamstore/mock/details.go diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 3f477c437..946445584 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -97,10 +97,6 @@ type backupStats struct { resourceCount int } -type detailsWriter interface { - WriteBackupDetails(context.Context, *details.Details, *fault.Bus) (string, error) -} - // --------------------------------------------------------------------------- // Primary Controller // --------------------------------------------------------------------------- @@ -220,7 +216,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { func (op *BackupOperation) do( ctx context.Context, opStats *backupStats, - detailsStore detailsReader, + detailsStore streamstore.Streamer, backupID model.StableID, ) (*details.Builder, error) { reasons := selectorToReasons(op.Selectors) @@ -498,7 +494,7 @@ func matchesReason(reasons []kopia.Reason, p path.Path) bool { func mergeDetails( ctx context.Context, ms *store.Wrapper, - detailsStore detailsReader, + detailsStore streamstore.Streamer, mans []*kopia.ManifestEntry, shortRefsFromPrevBackup map[string]kopia.PrevRefs, deets *details.Builder, @@ -654,7 +650,7 @@ func (op *BackupOperation) persistResults( // stores the operation details, results, and selectors in the backup manifest. func (op *BackupOperation) createBackupModels( ctx context.Context, - detailsStore detailsWriter, + detailsStore streamstore.Writer, snapID string, backupID model.StableID, backupDetails *details.Details, @@ -665,7 +661,7 @@ func (op *BackupOperation) createBackupModels( return clues.New("no backup details to record").WithClues(ctx) } - detailsID, err := detailsStore.WriteBackupDetails(ctx, backupDetails, op.Errors) + detailsID, err := detailsStore.Write(ctx, backupDetails, op.Errors) if err != nil { return clues.Wrap(err, "creating backupDetails model").WithClues(ctx) } diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index f25af5198..2972706b6 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -18,6 +18,7 @@ import ( evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" + ssmock "github.com/alcionai/corso/src/internal/streamstore/mock" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/backup" @@ -108,26 +109,6 @@ func (mbu mockBackuper) BackupCollections( return &kopia.BackupStats{}, &details.Builder{}, nil, nil } -// ----- details - -type mockDetailsReader struct { - entries map[string]*details.Details -} - -func (mdr mockDetailsReader) ReadBackupDetails( - ctx context.Context, - detailsID string, - errs *fault.Bus, -) (*details.Details, error) { - r := mdr.entries[detailsID] - - if r == nil { - return nil, errors.Errorf("no details for ID %s", detailsID) - } - - return r, nil -} - // ----- model store for backups type mockBackupStorer struct { @@ -326,8 +307,7 @@ func makeDetailsEntry( assert.FailNowf( t, "service %s not supported in helper function", - p.Service().String(), - ) + p.Service().String()) } return res @@ -1221,14 +1201,14 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems ctx, flush := tester.NewContext() defer flush() - mdr := mockDetailsReader{entries: test.populatedDetails} + mds := ssmock.DetailsStreamer{Entries: test.populatedDetails} w := &store.Wrapper{Storer: mockBackupStorer{entries: test.populatedModels}} deets := details.Builder{} err := mergeDetails( ctx, w, - mdr, + mds, test.inputMans, test.inputShortRefsFromPrevBackup, &deets, @@ -1303,50 +1283,107 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsFolde backup1.ID: backup1, } - itemSize = 42 - itemDetails = makeDetailsEntry(t, itemPath1, itemPath1, itemSize, false) - - populatedDetails = map[string]*details.Details{ - backup1.DetailsID: { - DetailsModel: details.DetailsModel{ - Entries: []details.DetailsEntry{ - *itemDetails, - }, - }, - }, - } - - expectedEntries = []details.DetailsEntry{ - *itemDetails, - } + itemSize = 42 + now = time.Now() + // later = now.Add(42 * time.Minute) ) - itemDetails.Exchange.Modified = time.Now() + itemDetails := makeDetailsEntry(t, itemPath1, itemPath1, itemSize, false) + // itemDetails.Exchange.Modified = now + + populatedDetails := map[string]*details.Details{ + backup1.DetailsID: { + DetailsModel: details.DetailsModel{ + Entries: []details.DetailsEntry{*itemDetails}, + }, + }, + } + + expectedEntries := []details.DetailsEntry{*itemDetails} + + // update the details + itemDetails.Exchange.Modified = now for i := 1; i < len(pathElems); i++ { expectedEntries = append(expectedEntries, *makeFolderEntry( t, path.Builder{}.Append(pathElems[:i]...), int64(itemSize), - itemDetails.Exchange.Modified, - )) + itemDetails.Exchange.Modified)) } ctx, flush := tester.NewContext() defer flush() - mdr := mockDetailsReader{entries: populatedDetails} - w := &store.Wrapper{Storer: mockBackupStorer{entries: populatedModels}} - deets := details.Builder{} + var ( + mds = ssmock.DetailsStreamer{Entries: populatedDetails} + w = &store.Wrapper{Storer: mockBackupStorer{entries: populatedModels}} + deets = details.Builder{} + ) err := mergeDetails( ctx, w, - mdr, + mds, inputMans, inputToMerge, &deets, fault.New(true)) assert.NoError(t, err) - assert.ElementsMatch(t, expectedEntries, deets.Details().Entries) + compareDeetEntries(t, expectedEntries, deets.Details().Entries) +} + +// compares two details slices. Useful for tests where serializing the +// entries can produce minor variations in the time struct, causing +// assert.elementsMatch to fail. +func compareDeetEntries(t *testing.T, expect, result []details.DetailsEntry) { + if !assert.Equal(t, len(expect), len(result), "entry slices should be equal len") { + require.ElementsMatch(t, expect, result) + } + + var ( + // repoRef -> modified time + eMods = map[string]time.Time{} + es = make([]details.DetailsEntry, 0, len(expect)) + rs = make([]details.DetailsEntry, 0, len(expect)) + ) + + for _, e := range expect { + eMods[e.RepoRef] = e.Modified() + es = append(es, withoutModified(e)) + } + + for _, r := range result { + // this comparison is an artifact of bad comparisons across time.Time + // serialization using assert.ElementsMatch. The time struct can produce + // differences in its `ext` value across serialization while the actual time + // reference remains the same. assert handles this poorly, whereas the time + // library provides successful comparison. + assert.Truef( + t, + eMods[r.RepoRef].Equal(r.Modified()), + "expected modified time %v, got %v", eMods[r.RepoRef], r.Modified()) + + rs = append(rs, withoutModified(r)) + } + + assert.ElementsMatch(t, es, rs) +} + +func withoutModified(de details.DetailsEntry) details.DetailsEntry { + switch { + case de.Exchange != nil: + de.Exchange.Modified = time.Time{} + + case de.OneDrive != nil: + de.OneDrive.Modified = time.Time{} + + case de.SharePoint != nil: + de.SharePoint.Modified = time.Time{} + + case de.Folder != nil: + de.Folder.Modified = time.Time{} + } + + return de } diff --git a/src/internal/operations/common.go b/src/internal/operations/common.go index 4b9caeb54..c590c9cc3 100644 --- a/src/internal/operations/common.go +++ b/src/internal/operations/common.go @@ -6,21 +6,18 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/model" + "github.com/alcionai/corso/src/internal/streamstore" "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/store" ) -type detailsReader interface { - ReadBackupDetails(ctx context.Context, detailsID string, errs *fault.Bus) (*details.Details, error) -} - func getBackupAndDetailsFromID( ctx context.Context, backupID model.StableID, ms *store.Wrapper, - detailsStore detailsReader, + detailsStore streamstore.Reader, errs *fault.Bus, ) (*backup.Backup, *details.Details, error) { dID, bup, err := ms.GetDetailsIDFromBackupID(ctx, backupID) @@ -28,10 +25,14 @@ func getBackupAndDetailsFromID( return nil, nil, errors.Wrap(err, "getting backup details ID") } - deets, err := detailsStore.ReadBackupDetails(ctx, dID, errs) - if err != nil { + var ( + deets details.Details + umt = details.UnmarshalTo(&deets) + ) + + if err := detailsStore.Read(ctx, dID, umt, errs); err != nil { return nil, nil, errors.Wrap(err, "getting backup details data") } - return bup, deets, nil + return bup, &deets, nil } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 2bc243bf4..3e42d0d05 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -181,7 +181,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De func (op *RestoreOperation) do( ctx context.Context, opStats *restoreStats, - detailsStore detailsReader, + detailsStore streamstore.Reader, start time.Time, ) (*details.Details, error) { bup, deets, err := getBackupAndDetailsFromID( diff --git a/src/internal/streamstore/details.go b/src/internal/streamstore/details.go index 97ecf2e95..a9f919f68 100644 --- a/src/internal/streamstore/details.go +++ b/src/internal/streamstore/details.go @@ -2,24 +2,26 @@ package streamstore import ( "context" - "encoding/json" "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/stats" - "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/pkg/errors" ) +var _ Streamer = &streamDetails{} + type streamDetails struct { kw *kopia.Wrapper tenant string service path.ServiceType } +// NewDetails creates a new storeStreamer for streaming +// details.Details structs. func NewDetails( kw *kopia.Wrapper, tenant string, @@ -28,13 +30,17 @@ func NewDetails( return &streamDetails{kw: kw, tenant: tenant, service: service} } -// WriteBackupDetails persists a `details.Details` -// object in the stream store -func (ss *streamDetails) WriteBackupDetails( - ctx context.Context, - backupDetails *details.Details, - errs *fault.Bus, -) (string, error) { +const ( + // detailsItemName is the name of the stream used to store + // backup details + detailsItemName = "details" + // collectionPurposeDetails is used to indicate + // what the collection is being used for + collectionPurposeDetails = "details" +) + +// Write persists a `details.Details` object in the stream store +func (ss *streamDetails) Write(ctx context.Context, deets Marshaller, errs *fault.Bus) (string, error) { // construct the path of the container for the `details` item p, err := path.Builder{}. ToStreamStorePath( @@ -48,7 +54,7 @@ func (ss *streamDetails) WriteBackupDetails( // TODO: We could use an io.Pipe here to avoid a double copy but that // makes error handling a bit complicated - dbytes, err := json.Marshal(backupDetails) + dbytes, err := deets.Marshal() if err != nil { return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx) } @@ -76,13 +82,13 @@ func (ss *streamDetails) WriteBackupDetails( return backupStats.SnapshotID, nil } -// ReadBackupDetails reads the specified details object -// from the kopia repository -func (ss *streamDetails) ReadBackupDetails( +// Read reads a `details.Details` object from the kopia repository +func (ss *streamDetails) Read( ctx context.Context, detailsID string, + umr Unmarshaller, errs *fault.Bus, -) (*details.Details, error) { +) error { // construct the path for the `details` item detailsPath, err := path.Builder{}. Append(detailsItemName). @@ -93,47 +99,48 @@ func (ss *streamDetails) ReadBackupDetails( true, ) if err != nil { - return nil, clues.Stack(err).WithClues(ctx) + return clues.Stack(err).WithClues(ctx) } - var bc stats.ByteCounter - - dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs) + dcs, err := ss.kw.RestoreMultipleItems( + ctx, + detailsID, + []path.Path{detailsPath}, + &stats.ByteCounter{}, + errs) if err != nil { - return nil, errors.Wrap(err, "retrieving backup details data") + return errors.Wrap(err, "retrieving backup details data") } // Expect only 1 data collection if len(dcs) != 1 { - return nil, clues.New("greater than 1 details data collection found"). + return clues.New("greater than 1 details collection found"). WithClues(ctx). With("collection_count", len(dcs)) } - dc := dcs[0] - - var d details.Details - - found := false - items := dc.Items(ctx, errs) + var ( + dc = dcs[0] + found = false + items = dc.Items(ctx, errs) + ) for { select { case <-ctx.Done(): - return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx) + return clues.New("context cancelled waiting for backup details data").WithClues(ctx) case itemData, ok := <-items: if !ok { if !found { - return nil, clues.New("no backup details found").WithClues(ctx) + return clues.New("no backup details found").WithClues(ctx) } - return &d, nil + return nil } - err := json.NewDecoder(itemData.ToReader()).Decode(&d) - if err != nil { - return nil, clues.Wrap(err, "decoding details data").WithClues(ctx) + if err := umr(itemData.ToReader()); err != nil { + return clues.Wrap(err, "unmarshalling details data").WithClues(ctx) } found = true @@ -141,11 +148,8 @@ func (ss *streamDetails) ReadBackupDetails( } } -// DeleteBackupDetails deletes the specified details object from the kopia repository -func (ss *streamDetails) DeleteBackupDetails( - ctx context.Context, - detailsID string, -) error { +// Delete deletes a `details.Details` object from the kopia repository +func (ss *streamDetails) Delete(ctx context.Context, detailsID string) error { err := ss.kw.DeleteSnapshot(ctx, detailsID) if err != nil { return errors.Wrap(err, "deleting backup details") diff --git a/src/internal/streamstore/details_test.go b/src/internal/streamstore/details_test.go index 6c668b590..118626d77 100644 --- a/src/internal/streamstore/details_test.go +++ b/src/internal/streamstore/details_test.go @@ -54,16 +54,19 @@ func (suite *StreamDetailsIntegrationSuite) TestDetails() { }, }) - deets := deetsBuilder.Details() - nd := NewDetails(kw, "tenant", path.ExchangeService) + var ( + deets = deetsBuilder.Details() + sd = NewDetails(kw, "tenant", path.ExchangeService) + ) - id, err := nd.WriteBackupDetails(ctx, deets, fault.New(true)) + id, err := sd.Write(ctx, deets, fault.New(true)) require.NoError(t, err) require.NotNil(t, id) - readDeets, err := nd.ReadBackupDetails(ctx, id, fault.New(true)) + var readDeets details.Details + err = sd.Read(ctx, id, details.UnmarshalTo(&readDeets), fault.New(true)) require.NoError(t, err) - require.NotNil(t, readDeets) + require.NotEmpty(t, readDeets) assert.Equal(t, len(deets.Entries), len(readDeets.Entries)) assert.Equal(t, deets.Entries[0].ParentRef, readDeets.Entries[0].ParentRef) diff --git a/src/internal/streamstore/mock/details.go b/src/internal/streamstore/mock/details.go new file mode 100644 index 000000000..0a87d37df --- /dev/null +++ b/src/internal/streamstore/mock/details.go @@ -0,0 +1,48 @@ +package mock + +import ( + "bytes" + "context" + "io" + + "github.com/pkg/errors" + + "github.com/alcionai/clues" + "github.com/alcionai/corso/src/internal/streamstore" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" +) + +var _ streamstore.Streamer = &DetailsStreamer{} + +type DetailsStreamer struct { + Entries map[string]*details.Details +} + +func (ds DetailsStreamer) Read( + ctx context.Context, + detailsID string, + umr streamstore.Unmarshaller, + errs *fault.Bus, +) error { + r := ds.Entries[detailsID] + + if r == nil { + return errors.Errorf("no details for ID %s", detailsID) + } + + bs, err := r.Marshal() + if err != nil { + return err + } + + return umr(io.NopCloser(bytes.NewReader(bs))) +} + +func (ds DetailsStreamer) Write(context.Context, streamstore.Marshaller, *fault.Bus) (string, error) { + return "", clues.New("not implmented") +} + +func (ds DetailsStreamer) Delete(context.Context, string) error { + return clues.New("not implmented") +} diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index 5f5855027..21e0baf05 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -12,14 +12,37 @@ import ( "github.com/alcionai/corso/src/pkg/path" ) -const ( - // detailsItemName is the name of the stream used to store - // backup details - detailsItemName = "details" - // collectionPurposeDetails is used to indicate - // what the collection is being used for - collectionPurposeDetails = "details" -) +// --------------------------------------------------------------------------- +// interfaces +// --------------------------------------------------------------------------- + +// Streamer is the core interface for all types of data streamed to and +// from the store. +type Streamer interface { + Writer + Reader + Delete(context.Context, string) error +} + +type Reader interface { + Read(context.Context, string, Unmarshaller, *fault.Bus) error +} + +type Writer interface { + Write(context.Context, Marshaller, *fault.Bus) (string, error) +} + +// Marshallers are used to convert structs into bytes to be persisted in the store. +type Marshaller interface { + Marshal() ([]byte, error) +} + +// Unmarshallers are used to serialize the bytes in the store into the original struct. +type Unmarshaller func(io.ReadCloser) error + +// --------------------------------------------------------------------------- +// collection +// --------------------------------------------------------------------------- // streamCollection is a data.BackupCollection used to persist // a single data stream @@ -56,6 +79,10 @@ func (dc *streamCollection) Items(context.Context, *fault.Bus) <-chan data.Strea return items } +// --------------------------------------------------------------------------- +// item +// --------------------------------------------------------------------------- + type streamItem struct { name string data []byte diff --git a/src/pkg/backup/details/details.go b/src/pkg/backup/details/details.go index edb545cb2..152a03311 100644 --- a/src/pkg/backup/details/details.go +++ b/src/pkg/backup/details/details.go @@ -2,6 +2,8 @@ package details import ( "context" + "encoding/json" + "io" "strconv" "strings" "sync" @@ -305,6 +307,18 @@ func (d *Details) addFolder(folder folderEntry) { }) } +// Marshal complies with the marshaller interface in streamStore. +func (d *Details) Marshal() ([]byte, error) { + return json.Marshal(d) +} + +// UnmarshalTo produces a func that complies with the unmarshaller type in streamStore. +func UnmarshalTo(d *Details) func(io.ReadCloser) error { + return func(rc io.ReadCloser) error { + return json.NewDecoder(rc).Decode(d) + } +} + // -------------------------------------------------------------------------------- // Entry // -------------------------------------------------------------------------------- diff --git a/src/pkg/backup/details/details_test.go b/src/pkg/backup/details/details_test.go index fb75526a3..52eddfe04 100644 --- a/src/pkg/backup/details/details_test.go +++ b/src/pkg/backup/details/details_test.go @@ -1,6 +1,8 @@ package details import ( + "bytes" + "io" "testing" "time" @@ -997,3 +999,40 @@ func (suite *DetailsUnitSuite) TestFolderEntriesForPath() { }) } } + +func (suite *DetailsUnitSuite) TestDetails_Marshal() { + for _, test := range pathItemsTable { + suite.Run(test.name, func() { + d := &Details{DetailsModel: DetailsModel{ + Entries: test.ents, + }} + + bs, err := d.Marshal() + require.NoError(suite.T(), err) + assert.NotEmpty(suite.T(), bs) + }) + } +} + +func (suite *DetailsUnitSuite) TestUnarshalTo() { + for _, test := range pathItemsTable { + suite.Run(test.name, func() { + orig := &Details{DetailsModel: DetailsModel{ + Entries: test.ents, + }} + + bs, err := orig.Marshal() + require.NoError(suite.T(), err) + assert.NotEmpty(suite.T(), bs) + + var result Details + umt := UnmarshalTo(&result) + err = umt(io.NopCloser(bytes.NewReader(bs))) + + t := suite.T() + require.NoError(t, err) + require.NotNil(t, result) + assert.ElementsMatch(t, orig.Entries, result.Entries) + }) + } +} diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index b007ff5b5..d436d78e9 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -351,12 +351,14 @@ func (r repository) BackupDetails( return nil, nil, errs.Fail(err) } - deets, err := streamstore.NewDetails( + nd := streamstore.NewDetails( r.dataLayer, r.Account.ID(), b.Selector.PathService(), - ).ReadBackupDetails(ctx, dID, errs) - if err != nil { + ) + + var deets details.Details + if err := nd.Read(ctx, dID, details.UnmarshalTo(&deets), errs); err != nil { return nil, nil, errs.Fail(err) } @@ -371,7 +373,7 @@ func (r repository) BackupDetails( } } - return deets, b, errs + return &deets, b, errs } // DeleteBackup removes the backup from both the model store and the backup storage.