diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 3f477c437..946445584 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -97,10 +97,6 @@ type backupStats struct { resourceCount int } -type detailsWriter interface { - WriteBackupDetails(context.Context, *details.Details, *fault.Bus) (string, error) -} - // --------------------------------------------------------------------------- // Primary Controller // --------------------------------------------------------------------------- @@ -220,7 +216,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { func (op *BackupOperation) do( ctx context.Context, opStats *backupStats, - detailsStore detailsReader, + detailsStore streamstore.Streamer, backupID model.StableID, ) (*details.Builder, error) { reasons := selectorToReasons(op.Selectors) @@ -498,7 +494,7 @@ func matchesReason(reasons []kopia.Reason, p path.Path) bool { func mergeDetails( ctx context.Context, ms *store.Wrapper, - detailsStore detailsReader, + detailsStore streamstore.Streamer, mans []*kopia.ManifestEntry, shortRefsFromPrevBackup map[string]kopia.PrevRefs, deets *details.Builder, @@ -654,7 +650,7 @@ func (op *BackupOperation) persistResults( // stores the operation details, results, and selectors in the backup manifest. func (op *BackupOperation) createBackupModels( ctx context.Context, - detailsStore detailsWriter, + detailsStore streamstore.Writer, snapID string, backupID model.StableID, backupDetails *details.Details, @@ -665,7 +661,7 @@ func (op *BackupOperation) createBackupModels( return clues.New("no backup details to record").WithClues(ctx) } - detailsID, err := detailsStore.WriteBackupDetails(ctx, backupDetails, op.Errors) + detailsID, err := detailsStore.Write(ctx, backupDetails, op.Errors) if err != nil { return clues.Wrap(err, "creating backupDetails model").WithClues(ctx) } diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index f25af5198..2972706b6 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -18,6 +18,7 @@ import ( evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" + ssmock "github.com/alcionai/corso/src/internal/streamstore/mock" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/backup" @@ -108,26 +109,6 @@ func (mbu mockBackuper) BackupCollections( return &kopia.BackupStats{}, &details.Builder{}, nil, nil } -// ----- details - -type mockDetailsReader struct { - entries map[string]*details.Details -} - -func (mdr mockDetailsReader) ReadBackupDetails( - ctx context.Context, - detailsID string, - errs *fault.Bus, -) (*details.Details, error) { - r := mdr.entries[detailsID] - - if r == nil { - return nil, errors.Errorf("no details for ID %s", detailsID) - } - - return r, nil -} - // ----- model store for backups type mockBackupStorer struct { @@ -326,8 +307,7 @@ func makeDetailsEntry( assert.FailNowf( t, "service %s not supported in helper function", - p.Service().String(), - ) + p.Service().String()) } return res @@ -1221,14 +1201,14 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems ctx, flush := tester.NewContext() defer flush() - mdr := mockDetailsReader{entries: test.populatedDetails} + mds := ssmock.DetailsStreamer{Entries: test.populatedDetails} w := &store.Wrapper{Storer: mockBackupStorer{entries: test.populatedModels}} deets := details.Builder{} err := mergeDetails( ctx, w, - mdr, + mds, test.inputMans, test.inputShortRefsFromPrevBackup, &deets, @@ -1303,50 +1283,107 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsFolde backup1.ID: backup1, } - itemSize = 42 - itemDetails = makeDetailsEntry(t, itemPath1, itemPath1, itemSize, false) - - populatedDetails = map[string]*details.Details{ - backup1.DetailsID: { - DetailsModel: details.DetailsModel{ - Entries: []details.DetailsEntry{ - *itemDetails, - }, - }, - }, - } - - expectedEntries = []details.DetailsEntry{ - *itemDetails, - } + itemSize = 42 + now = time.Now() + // later = now.Add(42 * time.Minute) ) - itemDetails.Exchange.Modified = time.Now() + itemDetails := makeDetailsEntry(t, itemPath1, itemPath1, itemSize, false) + // itemDetails.Exchange.Modified = now + + populatedDetails := map[string]*details.Details{ + backup1.DetailsID: { + DetailsModel: details.DetailsModel{ + Entries: []details.DetailsEntry{*itemDetails}, + }, + }, + } + + expectedEntries := []details.DetailsEntry{*itemDetails} + + // update the details + itemDetails.Exchange.Modified = now for i := 1; i < len(pathElems); i++ { expectedEntries = append(expectedEntries, *makeFolderEntry( t, path.Builder{}.Append(pathElems[:i]...), int64(itemSize), - itemDetails.Exchange.Modified, - )) + itemDetails.Exchange.Modified)) } ctx, flush := tester.NewContext() defer flush() - mdr := mockDetailsReader{entries: populatedDetails} - w := &store.Wrapper{Storer: mockBackupStorer{entries: populatedModels}} - deets := details.Builder{} + var ( + mds = ssmock.DetailsStreamer{Entries: populatedDetails} + w = &store.Wrapper{Storer: mockBackupStorer{entries: populatedModels}} + deets = details.Builder{} + ) err := mergeDetails( ctx, w, - mdr, + mds, inputMans, inputToMerge, &deets, fault.New(true)) assert.NoError(t, err) - assert.ElementsMatch(t, expectedEntries, deets.Details().Entries) + compareDeetEntries(t, expectedEntries, deets.Details().Entries) +} + +// compares two details slices. Useful for tests where serializing the +// entries can produce minor variations in the time struct, causing +// assert.elementsMatch to fail. +func compareDeetEntries(t *testing.T, expect, result []details.DetailsEntry) { + if !assert.Equal(t, len(expect), len(result), "entry slices should be equal len") { + require.ElementsMatch(t, expect, result) + } + + var ( + // repoRef -> modified time + eMods = map[string]time.Time{} + es = make([]details.DetailsEntry, 0, len(expect)) + rs = make([]details.DetailsEntry, 0, len(expect)) + ) + + for _, e := range expect { + eMods[e.RepoRef] = e.Modified() + es = append(es, withoutModified(e)) + } + + for _, r := range result { + // this comparison is an artifact of bad comparisons across time.Time + // serialization using assert.ElementsMatch. The time struct can produce + // differences in its `ext` value across serialization while the actual time + // reference remains the same. assert handles this poorly, whereas the time + // library provides successful comparison. + assert.Truef( + t, + eMods[r.RepoRef].Equal(r.Modified()), + "expected modified time %v, got %v", eMods[r.RepoRef], r.Modified()) + + rs = append(rs, withoutModified(r)) + } + + assert.ElementsMatch(t, es, rs) +} + +func withoutModified(de details.DetailsEntry) details.DetailsEntry { + switch { + case de.Exchange != nil: + de.Exchange.Modified = time.Time{} + + case de.OneDrive != nil: + de.OneDrive.Modified = time.Time{} + + case de.SharePoint != nil: + de.SharePoint.Modified = time.Time{} + + case de.Folder != nil: + de.Folder.Modified = time.Time{} + } + + return de } diff --git a/src/internal/operations/common.go b/src/internal/operations/common.go index 4b9caeb54..c590c9cc3 100644 --- a/src/internal/operations/common.go +++ b/src/internal/operations/common.go @@ -6,21 +6,18 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/model" + "github.com/alcionai/corso/src/internal/streamstore" "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/store" ) -type detailsReader interface { - ReadBackupDetails(ctx context.Context, detailsID string, errs *fault.Bus) (*details.Details, error) -} - func getBackupAndDetailsFromID( ctx context.Context, backupID model.StableID, ms *store.Wrapper, - detailsStore detailsReader, + detailsStore streamstore.Reader, errs *fault.Bus, ) (*backup.Backup, *details.Details, error) { dID, bup, err := ms.GetDetailsIDFromBackupID(ctx, backupID) @@ -28,10 +25,14 @@ func getBackupAndDetailsFromID( return nil, nil, errors.Wrap(err, "getting backup details ID") } - deets, err := detailsStore.ReadBackupDetails(ctx, dID, errs) - if err != nil { + var ( + deets details.Details + umt = details.UnmarshalTo(&deets) + ) + + if err := detailsStore.Read(ctx, dID, umt, errs); err != nil { return nil, nil, errors.Wrap(err, "getting backup details data") } - return bup, deets, nil + return bup, &deets, nil } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 2bc243bf4..3e42d0d05 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -181,7 +181,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De func (op *RestoreOperation) do( ctx context.Context, opStats *restoreStats, - detailsStore detailsReader, + detailsStore streamstore.Reader, start time.Time, ) (*details.Details, error) { bup, deets, err := getBackupAndDetailsFromID( diff --git a/src/internal/streamstore/details.go b/src/internal/streamstore/details.go index 97ecf2e95..a9f919f68 100644 --- a/src/internal/streamstore/details.go +++ b/src/internal/streamstore/details.go @@ -2,24 +2,26 @@ package streamstore import ( "context" - "encoding/json" "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/stats" - "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/pkg/errors" ) +var _ Streamer = &streamDetails{} + type streamDetails struct { kw *kopia.Wrapper tenant string service path.ServiceType } +// NewDetails creates a new storeStreamer for streaming +// details.Details structs. func NewDetails( kw *kopia.Wrapper, tenant string, @@ -28,13 +30,17 @@ func NewDetails( return &streamDetails{kw: kw, tenant: tenant, service: service} } -// WriteBackupDetails persists a `details.Details` -// object in the stream store -func (ss *streamDetails) WriteBackupDetails( - ctx context.Context, - backupDetails *details.Details, - errs *fault.Bus, -) (string, error) { +const ( + // detailsItemName is the name of the stream used to store + // backup details + detailsItemName = "details" + // collectionPurposeDetails is used to indicate + // what the collection is being used for + collectionPurposeDetails = "details" +) + +// Write persists a `details.Details` object in the stream store +func (ss *streamDetails) Write(ctx context.Context, deets Marshaller, errs *fault.Bus) (string, error) { // construct the path of the container for the `details` item p, err := path.Builder{}. ToStreamStorePath( @@ -48,7 +54,7 @@ func (ss *streamDetails) WriteBackupDetails( // TODO: We could use an io.Pipe here to avoid a double copy but that // makes error handling a bit complicated - dbytes, err := json.Marshal(backupDetails) + dbytes, err := deets.Marshal() if err != nil { return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx) } @@ -76,13 +82,13 @@ func (ss *streamDetails) WriteBackupDetails( return backupStats.SnapshotID, nil } -// ReadBackupDetails reads the specified details object -// from the kopia repository -func (ss *streamDetails) ReadBackupDetails( +// Read reads a `details.Details` object from the kopia repository +func (ss *streamDetails) Read( ctx context.Context, detailsID string, + umr Unmarshaller, errs *fault.Bus, -) (*details.Details, error) { +) error { // construct the path for the `details` item detailsPath, err := path.Builder{}. Append(detailsItemName). @@ -93,47 +99,48 @@ func (ss *streamDetails) ReadBackupDetails( true, ) if err != nil { - return nil, clues.Stack(err).WithClues(ctx) + return clues.Stack(err).WithClues(ctx) } - var bc stats.ByteCounter - - dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs) + dcs, err := ss.kw.RestoreMultipleItems( + ctx, + detailsID, + []path.Path{detailsPath}, + &stats.ByteCounter{}, + errs) if err != nil { - return nil, errors.Wrap(err, "retrieving backup details data") + return errors.Wrap(err, "retrieving backup details data") } // Expect only 1 data collection if len(dcs) != 1 { - return nil, clues.New("greater than 1 details data collection found"). + return clues.New("greater than 1 details collection found"). WithClues(ctx). With("collection_count", len(dcs)) } - dc := dcs[0] - - var d details.Details - - found := false - items := dc.Items(ctx, errs) + var ( + dc = dcs[0] + found = false + items = dc.Items(ctx, errs) + ) for { select { case <-ctx.Done(): - return nil, clues.New("context cancelled waiting for backup details data").WithClues(ctx) + return clues.New("context cancelled waiting for backup details data").WithClues(ctx) case itemData, ok := <-items: if !ok { if !found { - return nil, clues.New("no backup details found").WithClues(ctx) + return clues.New("no backup details found").WithClues(ctx) } - return &d, nil + return nil } - err := json.NewDecoder(itemData.ToReader()).Decode(&d) - if err != nil { - return nil, clues.Wrap(err, "decoding details data").WithClues(ctx) + if err := umr(itemData.ToReader()); err != nil { + return clues.Wrap(err, "unmarshalling details data").WithClues(ctx) } found = true @@ -141,11 +148,8 @@ func (ss *streamDetails) ReadBackupDetails( } } -// DeleteBackupDetails deletes the specified details object from the kopia repository -func (ss *streamDetails) DeleteBackupDetails( - ctx context.Context, - detailsID string, -) error { +// Delete deletes a `details.Details` object from the kopia repository +func (ss *streamDetails) Delete(ctx context.Context, detailsID string) error { err := ss.kw.DeleteSnapshot(ctx, detailsID) if err != nil { return errors.Wrap(err, "deleting backup details") diff --git a/src/internal/streamstore/details_test.go b/src/internal/streamstore/details_test.go index 6c668b590..118626d77 100644 --- a/src/internal/streamstore/details_test.go +++ b/src/internal/streamstore/details_test.go @@ -54,16 +54,19 @@ func (suite *StreamDetailsIntegrationSuite) TestDetails() { }, }) - deets := deetsBuilder.Details() - nd := NewDetails(kw, "tenant", path.ExchangeService) + var ( + deets = deetsBuilder.Details() + sd = NewDetails(kw, "tenant", path.ExchangeService) + ) - id, err := nd.WriteBackupDetails(ctx, deets, fault.New(true)) + id, err := sd.Write(ctx, deets, fault.New(true)) require.NoError(t, err) require.NotNil(t, id) - readDeets, err := nd.ReadBackupDetails(ctx, id, fault.New(true)) + var readDeets details.Details + err = sd.Read(ctx, id, details.UnmarshalTo(&readDeets), fault.New(true)) require.NoError(t, err) - require.NotNil(t, readDeets) + require.NotEmpty(t, readDeets) assert.Equal(t, len(deets.Entries), len(readDeets.Entries)) assert.Equal(t, deets.Entries[0].ParentRef, readDeets.Entries[0].ParentRef) diff --git a/src/internal/streamstore/mock/details.go b/src/internal/streamstore/mock/details.go new file mode 100644 index 000000000..0a87d37df --- /dev/null +++ b/src/internal/streamstore/mock/details.go @@ -0,0 +1,48 @@ +package mock + +import ( + "bytes" + "context" + "io" + + "github.com/pkg/errors" + + "github.com/alcionai/clues" + "github.com/alcionai/corso/src/internal/streamstore" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" +) + +var _ streamstore.Streamer = &DetailsStreamer{} + +type DetailsStreamer struct { + Entries map[string]*details.Details +} + +func (ds DetailsStreamer) Read( + ctx context.Context, + detailsID string, + umr streamstore.Unmarshaller, + errs *fault.Bus, +) error { + r := ds.Entries[detailsID] + + if r == nil { + return errors.Errorf("no details for ID %s", detailsID) + } + + bs, err := r.Marshal() + if err != nil { + return err + } + + return umr(io.NopCloser(bytes.NewReader(bs))) +} + +func (ds DetailsStreamer) Write(context.Context, streamstore.Marshaller, *fault.Bus) (string, error) { + return "", clues.New("not implmented") +} + +func (ds DetailsStreamer) Delete(context.Context, string) error { + return clues.New("not implmented") +} diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index 5f5855027..21e0baf05 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -12,14 +12,37 @@ import ( "github.com/alcionai/corso/src/pkg/path" ) -const ( - // detailsItemName is the name of the stream used to store - // backup details - detailsItemName = "details" - // collectionPurposeDetails is used to indicate - // what the collection is being used for - collectionPurposeDetails = "details" -) +// --------------------------------------------------------------------------- +// interfaces +// --------------------------------------------------------------------------- + +// Streamer is the core interface for all types of data streamed to and +// from the store. +type Streamer interface { + Writer + Reader + Delete(context.Context, string) error +} + +type Reader interface { + Read(context.Context, string, Unmarshaller, *fault.Bus) error +} + +type Writer interface { + Write(context.Context, Marshaller, *fault.Bus) (string, error) +} + +// Marshallers are used to convert structs into bytes to be persisted in the store. +type Marshaller interface { + Marshal() ([]byte, error) +} + +// Unmarshallers are used to serialize the bytes in the store into the original struct. +type Unmarshaller func(io.ReadCloser) error + +// --------------------------------------------------------------------------- +// collection +// --------------------------------------------------------------------------- // streamCollection is a data.BackupCollection used to persist // a single data stream @@ -56,6 +79,10 @@ func (dc *streamCollection) Items(context.Context, *fault.Bus) <-chan data.Strea return items } +// --------------------------------------------------------------------------- +// item +// --------------------------------------------------------------------------- + type streamItem struct { name string data []byte diff --git a/src/pkg/backup/details/details.go b/src/pkg/backup/details/details.go index edb545cb2..152a03311 100644 --- a/src/pkg/backup/details/details.go +++ b/src/pkg/backup/details/details.go @@ -2,6 +2,8 @@ package details import ( "context" + "encoding/json" + "io" "strconv" "strings" "sync" @@ -305,6 +307,18 @@ func (d *Details) addFolder(folder folderEntry) { }) } +// Marshal complies with the marshaller interface in streamStore. +func (d *Details) Marshal() ([]byte, error) { + return json.Marshal(d) +} + +// UnmarshalTo produces a func that complies with the unmarshaller type in streamStore. +func UnmarshalTo(d *Details) func(io.ReadCloser) error { + return func(rc io.ReadCloser) error { + return json.NewDecoder(rc).Decode(d) + } +} + // -------------------------------------------------------------------------------- // Entry // -------------------------------------------------------------------------------- diff --git a/src/pkg/backup/details/details_test.go b/src/pkg/backup/details/details_test.go index fb75526a3..52eddfe04 100644 --- a/src/pkg/backup/details/details_test.go +++ b/src/pkg/backup/details/details_test.go @@ -1,6 +1,8 @@ package details import ( + "bytes" + "io" "testing" "time" @@ -997,3 +999,40 @@ func (suite *DetailsUnitSuite) TestFolderEntriesForPath() { }) } } + +func (suite *DetailsUnitSuite) TestDetails_Marshal() { + for _, test := range pathItemsTable { + suite.Run(test.name, func() { + d := &Details{DetailsModel: DetailsModel{ + Entries: test.ents, + }} + + bs, err := d.Marshal() + require.NoError(suite.T(), err) + assert.NotEmpty(suite.T(), bs) + }) + } +} + +func (suite *DetailsUnitSuite) TestUnarshalTo() { + for _, test := range pathItemsTable { + suite.Run(test.name, func() { + orig := &Details{DetailsModel: DetailsModel{ + Entries: test.ents, + }} + + bs, err := orig.Marshal() + require.NoError(suite.T(), err) + assert.NotEmpty(suite.T(), bs) + + var result Details + umt := UnmarshalTo(&result) + err = umt(io.NopCloser(bytes.NewReader(bs))) + + t := suite.T() + require.NoError(t, err) + require.NotNil(t, result) + assert.ElementsMatch(t, orig.Entries, result.Entries) + }) + } +} diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index b007ff5b5..d436d78e9 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -351,12 +351,14 @@ func (r repository) BackupDetails( return nil, nil, errs.Fail(err) } - deets, err := streamstore.NewDetails( + nd := streamstore.NewDetails( r.dataLayer, r.Account.ID(), b.Selector.PathService(), - ).ReadBackupDetails(ctx, dID, errs) - if err != nil { + ) + + var deets details.Details + if err := nd.Read(ctx, dID, details.UnmarshalTo(&deets), errs); err != nil { return nil, nil, errs.Fail(err) } @@ -371,7 +373,7 @@ func (r repository) BackupDetails( } } - return deets, b, errs + return &deets, b, errs } // DeleteBackup removes the backup from both the model store and the backup storage.