diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 152b29e20..3f477c437 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -127,7 +127,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { var ( opStats backupStats startTime = time.Now() - detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService()) + detailsStore = streamstore.NewDetails(op.kopia, op.account.ID(), op.Selectors.PathService()) ) op.Results.BackupID = model.StableID(uuid.NewString()) diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 70c179625..2bc243bf4 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -120,7 +120,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De restoreID: uuid.NewString(), } start = time.Now() - detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService()) + detailsStore = streamstore.NewDetails(op.kopia, op.account.ID(), op.Selectors.PathService()) ) // ----- diff --git a/src/internal/streamstore/details.go b/src/internal/streamstore/details.go new file mode 100644 index 000000000..97ecf2e95 --- /dev/null +++ b/src/internal/streamstore/details.go @@ -0,0 +1,155 @@ +package streamstore + +import ( + "context" + "encoding/json" + + "github.com/alcionai/clues" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" + "github.com/pkg/errors" +) + +type streamDetails struct { + kw *kopia.Wrapper + tenant string + service path.ServiceType +} + +func NewDetails( + kw *kopia.Wrapper, + tenant string, + service path.ServiceType, +) *streamDetails { + return &streamDetails{kw: kw, tenant: tenant, service: service} +} + +// WriteBackupDetails persists a `details.Details` +// object in the stream store +func (ss *streamDetails) WriteBackupDetails( + ctx context.Context, + backupDetails *details.Details, + errs *fault.Bus, +) (string, error) { + // construct the path of the container for the `details` item + p, err := path.Builder{}. + ToStreamStorePath( + ss.tenant, + collectionPurposeDetails, + ss.service, + false) + if err != nil { + return "", clues.Stack(err).WithClues(ctx) + } + + // TODO: We could use an io.Pipe here to avoid a double copy but that + // makes error handling a bit complicated + dbytes, err := json.Marshal(backupDetails) + if err != nil { + return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx) + } + + dc := &streamCollection{ + folderPath: p, + item: &streamItem{ + name: detailsItemName, + data: dbytes, + }, + } + + backupStats, _, _, err := ss.kw.BackupCollections( + ctx, + nil, + []data.BackupCollection{dc}, + nil, + nil, + false, + errs) + if err != nil { + return "", errors.Wrap(err, "storing details in repository") + } + + return backupStats.SnapshotID, nil +} + +// ReadBackupDetails reads the specified details object +// from the kopia repository +func (ss *streamDetails) ReadBackupDetails( + ctx context.Context, + detailsID string, + errs *fault.Bus, +) (*details.Details, error) { + // construct the path for the `details` item + detailsPath, err := path.Builder{}. + Append(detailsItemName). + ToStreamStorePath( + ss.tenant, + collectionPurposeDetails, + ss.service, + true, + ) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + + var bc stats.ByteCounter + + dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs) + if err != nil { + return nil, errors.Wrap(err, "retrieving backup details data") + } + + // Expect only 1 data collection + if len(dcs) != 1 { + return nil, clues.New("greater than 1 details data collection found"). + WithClues(ctx). + With("collection_count", len(dcs)) + } + + dc := dcs[0] + + var d details.Details + + found := false + items := dc.Items(ctx, errs) + + for { + select { + case <-ctx.Done(): + return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx) + + case itemData, ok := <-items: + if !ok { + if !found { + return nil, clues.New("no backup details found").WithClues(ctx) + } + + return &d, nil + } + + err := json.NewDecoder(itemData.ToReader()).Decode(&d) + if err != nil { + return nil, clues.Wrap(err, "decoding details data").WithClues(ctx) + } + + found = true + } + } +} + +// DeleteBackupDetails deletes the specified details object from the kopia repository +func (ss *streamDetails) DeleteBackupDetails( + ctx context.Context, + detailsID string, +) error { + err := ss.kw.DeleteSnapshot(ctx, detailsID) + if err != nil { + return errors.Wrap(err, "deleting backup details") + } + + return nil +} diff --git a/src/internal/streamstore/streamstore_test.go b/src/internal/streamstore/details_test.go similarity index 82% rename from src/internal/streamstore/streamstore_test.go rename to src/internal/streamstore/details_test.go index 7223ce61c..6c668b590 100644 --- a/src/internal/streamstore/streamstore_test.go +++ b/src/internal/streamstore/details_test.go @@ -14,19 +14,19 @@ import ( "github.com/alcionai/corso/src/pkg/path" ) -type StreamStoreIntegrationSuite struct { +type StreamDetailsIntegrationSuite struct { tester.Suite } -func TestStreamStoreIntegrationSuite(t *testing.T) { - suite.Run(t, &StreamStoreIntegrationSuite{ +func TestStreamDetailsIntegrationSuite(t *testing.T) { + suite.Run(t, &StreamDetailsIntegrationSuite{ Suite: tester.NewIntegrationSuite( t, [][]string{tester.AWSStorageCredEnvs}), }) } -func (suite *StreamStoreIntegrationSuite) TestDetails() { +func (suite *StreamDetailsIntegrationSuite) TestDetails() { t := suite.T() ctx, flush := tester.NewContext() @@ -55,13 +55,13 @@ func (suite *StreamStoreIntegrationSuite) TestDetails() { }) deets := deetsBuilder.Details() - ss := New(kw, "tenant", path.ExchangeService) + nd := NewDetails(kw, "tenant", path.ExchangeService) - id, err := ss.WriteBackupDetails(ctx, deets, fault.New(true)) + id, err := nd.WriteBackupDetails(ctx, deets, fault.New(true)) require.NoError(t, err) require.NotNil(t, id) - readDeets, err := ss.ReadBackupDetails(ctx, id, fault.New(true)) + readDeets, err := nd.ReadBackupDetails(ctx, id, fault.New(true)) require.NoError(t, err) require.NotNil(t, readDeets) diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index a9c7beeb5..5f5855027 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -5,34 +5,13 @@ package streamstore import ( "bytes" "context" - "encoding/json" "io" - "github.com/alcionai/clues" - "github.com/pkg/errors" - "github.com/alcionai/corso/src/internal/data" - "github.com/alcionai/corso/src/internal/kopia" - "github.com/alcionai/corso/src/internal/stats" - "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) -type streamStore struct { - kw *kopia.Wrapper - tenant string - service path.ServiceType -} - -func New( - kw *kopia.Wrapper, - tenant string, - service path.ServiceType, -) *streamStore { - return &streamStore{kw: kw, tenant: tenant, service: service} -} - const ( // detailsItemName is the name of the stream used to store // backup details @@ -42,132 +21,6 @@ const ( collectionPurposeDetails = "details" ) -// WriteBackupDetails persists a `details.Details` -// object in the stream store -func (ss *streamStore) WriteBackupDetails( - ctx context.Context, - backupDetails *details.Details, - errs *fault.Bus, -) (string, error) { - // construct the path of the container for the `details` item - p, err := path.Builder{}. - ToStreamStorePath( - ss.tenant, - collectionPurposeDetails, - ss.service, - false) - if err != nil { - return "", clues.Stack(err).WithClues(ctx) - } - - // TODO: We could use an io.Pipe here to avoid a double copy but that - // makes error handling a bit complicated - dbytes, err := json.Marshal(backupDetails) - if err != nil { - return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx) - } - - dc := &streamCollection{ - folderPath: p, - item: &streamItem{ - name: detailsItemName, - data: dbytes, - }, - } - - backupStats, _, _, err := ss.kw.BackupCollections( - ctx, - nil, - []data.BackupCollection{dc}, - nil, - nil, - false, - errs) - if err != nil { - return "", errors.Wrap(err, "storing details in repository") - } - - return backupStats.SnapshotID, nil -} - -// ReadBackupDetails reads the specified details object -// from the kopia repository -func (ss *streamStore) ReadBackupDetails( - ctx context.Context, - detailsID string, - errs *fault.Bus, -) (*details.Details, error) { - // construct the path for the `details` item - detailsPath, err := path.Builder{}. - Append(detailsItemName). - ToStreamStorePath( - ss.tenant, - collectionPurposeDetails, - ss.service, - true, - ) - if err != nil { - return nil, clues.Stack(err).WithClues(ctx) - } - - var bc stats.ByteCounter - - dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs) - if err != nil { - return nil, errors.Wrap(err, "retrieving backup details data") - } - - // Expect only 1 data collection - if len(dcs) != 1 { - return nil, clues.New("greater than 1 details data collection found"). - WithClues(ctx). - With("collection_count", len(dcs)) - } - - dc := dcs[0] - - var d details.Details - - found := false - items := dc.Items(ctx, errs) - - for { - select { - case <-ctx.Done(): - return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx) - - case itemData, ok := <-items: - if !ok { - if !found { - return nil, clues.New("no backup details found").WithClues(ctx) - } - - return &d, nil - } - - err := json.NewDecoder(itemData.ToReader()).Decode(&d) - if err != nil { - return nil, clues.Wrap(err, "decoding details data").WithClues(ctx) - } - - found = true - } - } -} - -// DeleteBackupDetails deletes the specified details object from the kopia repository -func (ss *streamStore) DeleteBackupDetails( - ctx context.Context, - detailsID string, -) error { - err := ss.kw.DeleteSnapshot(ctx, detailsID) - if err != nil { - return errors.Wrap(err, "deleting backup details") - } - - return nil -} - // streamCollection is a data.BackupCollection used to persist // a single data stream type streamCollection struct { diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 4975c9a03..b007ff5b5 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -351,7 +351,7 @@ func (r repository) BackupDetails( return nil, nil, errs.Fail(err) } - deets, err := streamstore.New( + deets, err := streamstore.NewDetails( r.dataLayer, r.Account.ID(), b.Selector.PathService(),