From bb61697a582224827347459ab5f01a38f0ff0f8c Mon Sep 17 00:00:00 2001 From: Keepers Date: Tue, 7 Mar 2023 16:19:26 -0700 Subject: [PATCH] migrate streamstore details to own file (#2725) Initial code movement before getting into larger refactor. The goal is to refactor the stream store so that it can comply with multiple different writers and readers beyond just the details.Details struct. This will allow us to use the stream store to ingest large lists of errors fault items without burdening the backup manifest. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :broom: Tech Debt/Cleanup #### Issue(s) * #2708 #### Test Plan - [x] :zap: Unit test --- src/internal/operations/backup.go | 2 +- src/internal/operations/restore.go | 2 +- src/internal/streamstore/details.go | 155 ++++++++++++++++++ .../{streamstore_test.go => details_test.go} | 14 +- src/internal/streamstore/streamstore.go | 147 ----------------- src/pkg/repository/repository.go | 2 +- 6 files changed, 165 insertions(+), 157 deletions(-) create mode 100644 src/internal/streamstore/details.go rename src/internal/streamstore/{streamstore_test.go => details_test.go} (82%) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 152b29e20..3f477c437 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -127,7 +127,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { var ( opStats backupStats startTime = time.Now() - detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService()) + detailsStore = streamstore.NewDetails(op.kopia, op.account.ID(), op.Selectors.PathService()) ) op.Results.BackupID = model.StableID(uuid.NewString()) diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 70c179625..2bc243bf4 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -120,7 +120,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De restoreID: uuid.NewString(), } start = time.Now() - detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService()) + detailsStore = streamstore.NewDetails(op.kopia, op.account.ID(), op.Selectors.PathService()) ) // ----- diff --git a/src/internal/streamstore/details.go b/src/internal/streamstore/details.go new file mode 100644 index 000000000..97ecf2e95 --- /dev/null +++ b/src/internal/streamstore/details.go @@ -0,0 +1,155 @@ +package streamstore + +import ( + "context" + "encoding/json" + + "github.com/alcionai/clues" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" + "github.com/pkg/errors" +) + +type streamDetails struct { + kw *kopia.Wrapper + tenant string + service path.ServiceType +} + +func NewDetails( + kw *kopia.Wrapper, + tenant string, + service path.ServiceType, +) *streamDetails { + return &streamDetails{kw: kw, tenant: tenant, service: service} +} + +// WriteBackupDetails persists a `details.Details` +// object in the stream store +func (ss *streamDetails) WriteBackupDetails( + ctx context.Context, + backupDetails *details.Details, + errs *fault.Bus, +) (string, error) { + // construct the path of the container for the `details` item + p, err := path.Builder{}. + ToStreamStorePath( + ss.tenant, + collectionPurposeDetails, + ss.service, + false) + if err != nil { + return "", clues.Stack(err).WithClues(ctx) + } + + // TODO: We could use an io.Pipe here to avoid a double copy but that + // makes error handling a bit complicated + dbytes, err := json.Marshal(backupDetails) + if err != nil { + return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx) + } + + dc := &streamCollection{ + folderPath: p, + item: &streamItem{ + name: detailsItemName, + data: dbytes, + }, + } + + backupStats, _, _, err := ss.kw.BackupCollections( + ctx, + nil, + []data.BackupCollection{dc}, + nil, + nil, + false, + errs) + if err != nil { + return "", errors.Wrap(err, "storing details in repository") + } + + return backupStats.SnapshotID, nil +} + +// ReadBackupDetails reads the specified details object +// from the kopia repository +func (ss *streamDetails) ReadBackupDetails( + ctx context.Context, + detailsID string, + errs *fault.Bus, +) (*details.Details, error) { + // construct the path for the `details` item + detailsPath, err := path.Builder{}. + Append(detailsItemName). + ToStreamStorePath( + ss.tenant, + collectionPurposeDetails, + ss.service, + true, + ) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + + var bc stats.ByteCounter + + dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs) + if err != nil { + return nil, errors.Wrap(err, "retrieving backup details data") + } + + // Expect only 1 data collection + if len(dcs) != 1 { + return nil, clues.New("greater than 1 details data collection found"). + WithClues(ctx). + With("collection_count", len(dcs)) + } + + dc := dcs[0] + + var d details.Details + + found := false + items := dc.Items(ctx, errs) + + for { + select { + case <-ctx.Done(): + return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx) + + case itemData, ok := <-items: + if !ok { + if !found { + return nil, clues.New("no backup details found").WithClues(ctx) + } + + return &d, nil + } + + err := json.NewDecoder(itemData.ToReader()).Decode(&d) + if err != nil { + return nil, clues.Wrap(err, "decoding details data").WithClues(ctx) + } + + found = true + } + } +} + +// DeleteBackupDetails deletes the specified details object from the kopia repository +func (ss *streamDetails) DeleteBackupDetails( + ctx context.Context, + detailsID string, +) error { + err := ss.kw.DeleteSnapshot(ctx, detailsID) + if err != nil { + return errors.Wrap(err, "deleting backup details") + } + + return nil +} diff --git a/src/internal/streamstore/streamstore_test.go b/src/internal/streamstore/details_test.go similarity index 82% rename from src/internal/streamstore/streamstore_test.go rename to src/internal/streamstore/details_test.go index 7223ce61c..6c668b590 100644 --- a/src/internal/streamstore/streamstore_test.go +++ b/src/internal/streamstore/details_test.go @@ -14,19 +14,19 @@ import ( "github.com/alcionai/corso/src/pkg/path" ) -type StreamStoreIntegrationSuite struct { +type StreamDetailsIntegrationSuite struct { tester.Suite } -func TestStreamStoreIntegrationSuite(t *testing.T) { - suite.Run(t, &StreamStoreIntegrationSuite{ +func TestStreamDetailsIntegrationSuite(t *testing.T) { + suite.Run(t, &StreamDetailsIntegrationSuite{ Suite: tester.NewIntegrationSuite( t, [][]string{tester.AWSStorageCredEnvs}), }) } -func (suite *StreamStoreIntegrationSuite) TestDetails() { +func (suite *StreamDetailsIntegrationSuite) TestDetails() { t := suite.T() ctx, flush := tester.NewContext() @@ -55,13 +55,13 @@ func (suite *StreamStoreIntegrationSuite) TestDetails() { }) deets := deetsBuilder.Details() - ss := New(kw, "tenant", path.ExchangeService) + nd := NewDetails(kw, "tenant", path.ExchangeService) - id, err := ss.WriteBackupDetails(ctx, deets, fault.New(true)) + id, err := nd.WriteBackupDetails(ctx, deets, fault.New(true)) require.NoError(t, err) require.NotNil(t, id) - readDeets, err := ss.ReadBackupDetails(ctx, id, fault.New(true)) + readDeets, err := nd.ReadBackupDetails(ctx, id, fault.New(true)) require.NoError(t, err) require.NotNil(t, readDeets) diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index a9c7beeb5..5f5855027 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -5,34 +5,13 @@ package streamstore import ( "bytes" "context" - "encoding/json" "io" - "github.com/alcionai/clues" - "github.com/pkg/errors" - "github.com/alcionai/corso/src/internal/data" - "github.com/alcionai/corso/src/internal/kopia" - "github.com/alcionai/corso/src/internal/stats" - "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) -type streamStore struct { - kw *kopia.Wrapper - tenant string - service path.ServiceType -} - -func New( - kw *kopia.Wrapper, - tenant string, - service path.ServiceType, -) *streamStore { - return &streamStore{kw: kw, tenant: tenant, service: service} -} - const ( // detailsItemName is the name of the stream used to store // backup details @@ -42,132 +21,6 @@ const ( collectionPurposeDetails = "details" ) -// WriteBackupDetails persists a `details.Details` -// object in the stream store -func (ss *streamStore) WriteBackupDetails( - ctx context.Context, - backupDetails *details.Details, - errs *fault.Bus, -) (string, error) { - // construct the path of the container for the `details` item - p, err := path.Builder{}. - ToStreamStorePath( - ss.tenant, - collectionPurposeDetails, - ss.service, - false) - if err != nil { - return "", clues.Stack(err).WithClues(ctx) - } - - // TODO: We could use an io.Pipe here to avoid a double copy but that - // makes error handling a bit complicated - dbytes, err := json.Marshal(backupDetails) - if err != nil { - return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx) - } - - dc := &streamCollection{ - folderPath: p, - item: &streamItem{ - name: detailsItemName, - data: dbytes, - }, - } - - backupStats, _, _, err := ss.kw.BackupCollections( - ctx, - nil, - []data.BackupCollection{dc}, - nil, - nil, - false, - errs) - if err != nil { - return "", errors.Wrap(err, "storing details in repository") - } - - return backupStats.SnapshotID, nil -} - -// ReadBackupDetails reads the specified details object -// from the kopia repository -func (ss *streamStore) ReadBackupDetails( - ctx context.Context, - detailsID string, - errs *fault.Bus, -) (*details.Details, error) { - // construct the path for the `details` item - detailsPath, err := path.Builder{}. - Append(detailsItemName). - ToStreamStorePath( - ss.tenant, - collectionPurposeDetails, - ss.service, - true, - ) - if err != nil { - return nil, clues.Stack(err).WithClues(ctx) - } - - var bc stats.ByteCounter - - dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs) - if err != nil { - return nil, errors.Wrap(err, "retrieving backup details data") - } - - // Expect only 1 data collection - if len(dcs) != 1 { - return nil, clues.New("greater than 1 details data collection found"). - WithClues(ctx). - With("collection_count", len(dcs)) - } - - dc := dcs[0] - - var d details.Details - - found := false - items := dc.Items(ctx, errs) - - for { - select { - case <-ctx.Done(): - return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx) - - case itemData, ok := <-items: - if !ok { - if !found { - return nil, clues.New("no backup details found").WithClues(ctx) - } - - return &d, nil - } - - err := json.NewDecoder(itemData.ToReader()).Decode(&d) - if err != nil { - return nil, clues.Wrap(err, "decoding details data").WithClues(ctx) - } - - found = true - } - } -} - -// DeleteBackupDetails deletes the specified details object from the kopia repository -func (ss *streamStore) DeleteBackupDetails( - ctx context.Context, - detailsID string, -) error { - err := ss.kw.DeleteSnapshot(ctx, detailsID) - if err != nil { - return errors.Wrap(err, "deleting backup details") - } - - return nil -} - // streamCollection is a data.BackupCollection used to persist // a single data stream type streamCollection struct { diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 4975c9a03..b007ff5b5 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -351,7 +351,7 @@ func (r repository) BackupDetails( return nil, nil, errs.Fail(err) } - deets, err := streamstore.New( + deets, err := streamstore.NewDetails( r.dataLayer, r.Account.ID(), b.Selector.PathService(),