Wire details merging during BackupOp.Run (#1905)
## Description Actually call the details merge function while performing a backup. If no base snapshots or `len(toMerge) == 0` then turns into a noop. This PR contains a little extra refactoring to keep from initializing a struct multiple times. However, long-term we should probably refactor BackupOp to better fit the fact that we are now using interfaces as inputs to many functions for the sake of testing ## Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No ## Type of change - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🐹 Trivial/Minor ## Issue(s) * closes #1800 ## Test Plan - [x] 💪 Manual - [ ] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
ff1b54b551
commit
35c1b10912
@ -122,7 +122,7 @@ func (w Wrapper) BackupCollections(
|
|||||||
service path.ServiceType,
|
service path.ServiceType,
|
||||||
oc *OwnersCats,
|
oc *OwnersCats,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
) (*BackupStats, *details.Details, map[string]path.Path, error) {
|
) (*BackupStats, *details.Builder, map[string]path.Path, error) {
|
||||||
if w.c == nil {
|
if w.c == nil {
|
||||||
return nil, nil, nil, errNotConnected
|
return nil, nil, nil, errNotConnected
|
||||||
}
|
}
|
||||||
@ -131,7 +131,7 @@ func (w Wrapper) BackupCollections(
|
|||||||
defer end()
|
defer end()
|
||||||
|
|
||||||
if len(collections) == 0 {
|
if len(collections) == 0 {
|
||||||
return &BackupStats{}, (&details.Builder{}).Details(), nil, nil
|
return &BackupStats{}, &details.Builder{}, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
progress := &corsoProgress{
|
progress := &corsoProgress{
|
||||||
@ -140,9 +140,7 @@ func (w Wrapper) BackupCollections(
|
|||||||
toMerge: map[string]path.Path{},
|
toMerge: map[string]path.Path{},
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(ashmrtn): Pass previousSnapshots here to enable building the directory
|
dirTree, err := inflateDirTree(ctx, w.c, previousSnapshots, collections, progress)
|
||||||
// hierarchy with them.
|
|
||||||
dirTree, err := inflateDirTree(ctx, w.c, nil, collections, progress)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, errors.Wrap(err, "building kopia directories")
|
return nil, nil, nil, errors.Wrap(err, "building kopia directories")
|
||||||
}
|
}
|
||||||
@ -159,7 +157,7 @@ func (w Wrapper) BackupCollections(
|
|||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s, progress.deets.Details(), progress.toMerge, nil
|
return s, progress.deets, progress.toMerge, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w Wrapper) makeSnapshotWithRoot(
|
func (w Wrapper) makeSnapshotWithRoot(
|
||||||
|
|||||||
@ -285,7 +285,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
|||||||
// 47 file and 6 folder entries.
|
// 47 file and 6 folder entries.
|
||||||
assert.Len(
|
assert.Len(
|
||||||
t,
|
t,
|
||||||
deets.Entries,
|
deets.Details().Entries,
|
||||||
test.expectedUploadedFiles+test.expectedCachedFiles+6,
|
test.expectedUploadedFiles+test.expectedCachedFiles+6,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -444,7 +444,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
|
|||||||
assert.Equal(t, 1, stats.IgnoredErrorCount)
|
assert.Equal(t, 1, stats.IgnoredErrorCount)
|
||||||
assert.False(t, stats.Incomplete)
|
assert.False(t, stats.Incomplete)
|
||||||
// 5 file and 6 folder entries.
|
// 5 file and 6 folder entries.
|
||||||
assert.Len(t, deets.Entries, 5+6)
|
assert.Len(t, deets.Details().Entries, 5+6)
|
||||||
}
|
}
|
||||||
|
|
||||||
type backedupFile struct {
|
type backedupFile struct {
|
||||||
@ -484,7 +484,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections()
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, BackupStats{}, *s)
|
assert.Equal(t, BackupStats{}, *s)
|
||||||
assert.Empty(t, d.Entries)
|
assert.Empty(t, d.Details().Entries)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -649,7 +649,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
|
|||||||
require.Equal(t, stats.IgnoredErrorCount, 0)
|
require.Equal(t, stats.IgnoredErrorCount, 0)
|
||||||
require.False(t, stats.Incomplete)
|
require.False(t, stats.Incomplete)
|
||||||
// 6 file and 6 folder entries.
|
// 6 file and 6 folder entries.
|
||||||
assert.Len(t, deets.Entries, expectedFiles+expectedDirs)
|
assert.Len(t, deets.Details().Entries, expectedFiles+expectedDirs)
|
||||||
|
|
||||||
suite.snapshotID = manifest.ID(stats.SnapshotID)
|
suite.snapshotID = manifest.ID(stats.SnapshotID)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -88,6 +88,10 @@ type backupStats struct {
|
|||||||
readErr, writeErr error
|
readErr, writeErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type detailsWriter interface {
|
||||||
|
WriteBackupDetails(context.Context, *details.Details) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Run begins a synchronous backup operation.
|
// Run begins a synchronous backup operation.
|
||||||
func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||||
ctx, end := D.Span(ctx, "operations:backup:run")
|
ctx, end := D.Span(ctx, "operations:backup:run")
|
||||||
@ -95,9 +99,11 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
opStats backupStats
|
opStats backupStats
|
||||||
backupDetails *details.Details
|
backupDetails *details.Builder
|
||||||
|
toMerge map[string]path.Path
|
||||||
tenantID = op.account.ID()
|
tenantID = op.account.ID()
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
|
detailsStore = streamstore.New(op.kopia, tenantID, op.Selectors.PathService())
|
||||||
)
|
)
|
||||||
|
|
||||||
op.Results.BackupID = model.StableID(uuid.NewString())
|
op.Results.BackupID = model.StableID(uuid.NewString())
|
||||||
@ -122,7 +128,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = op.createBackupModels(ctx, opStats.k.SnapshotID, backupDetails)
|
err = op.createBackupModels(
|
||||||
|
ctx,
|
||||||
|
detailsStore,
|
||||||
|
opStats.k.SnapshotID,
|
||||||
|
backupDetails.Details(),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
opStats.writeErr = err
|
opStats.writeErr = err
|
||||||
}
|
}
|
||||||
@ -149,7 +160,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
return opStats.readErr
|
return opStats.readErr
|
||||||
}
|
}
|
||||||
|
|
||||||
opStats.k, backupDetails, _, err = consumeBackupDataCollections(
|
opStats.k, backupDetails, toMerge, err = consumeBackupDataCollections(
|
||||||
ctx,
|
ctx,
|
||||||
op.kopia,
|
op.kopia,
|
||||||
tenantID,
|
tenantID,
|
||||||
@ -168,6 +179,18 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
opStats.k.TotalDirectoryCount, opStats.k.TotalFileCount,
|
opStats.k.TotalDirectoryCount, opStats.k.TotalFileCount,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if err = mergeDetails(
|
||||||
|
ctx,
|
||||||
|
op.store,
|
||||||
|
detailsStore,
|
||||||
|
mans,
|
||||||
|
toMerge,
|
||||||
|
backupDetails,
|
||||||
|
); err != nil {
|
||||||
|
opStats.writeErr = errors.Wrap(err, "merging backup details")
|
||||||
|
return opStats.writeErr
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: should always be 1, since backups are 1:1 with resourceOwners now.
|
// TODO: should always be 1, since backups are 1:1 with resourceOwners now.
|
||||||
opStats.resourceCount = len(data.ResourceOwnerSet(cs))
|
opStats.resourceCount = len(data.ResourceOwnerSet(cs))
|
||||||
opStats.started = true
|
opStats.started = true
|
||||||
@ -353,7 +376,7 @@ type backuper interface {
|
|||||||
service path.ServiceType,
|
service path.ServiceType,
|
||||||
oc *kopia.OwnersCats,
|
oc *kopia.OwnersCats,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
) (*kopia.BackupStats, *details.Details, map[string]path.Path, error)
|
) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// calls kopia to backup the collections of data
|
// calls kopia to backup the collections of data
|
||||||
@ -366,7 +389,7 @@ func consumeBackupDataCollections(
|
|||||||
mans []*kopia.ManifestEntry,
|
mans []*kopia.ManifestEntry,
|
||||||
cs []data.Collection,
|
cs []data.Collection,
|
||||||
backupID model.StableID,
|
backupID model.StableID,
|
||||||
) (*kopia.BackupStats, *details.Details, map[string]path.Path, error) {
|
) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) {
|
||||||
complete, closer := observe.MessageWithCompletion("Backing up data:")
|
complete, closer := observe.MessageWithCompletion("Backing up data:")
|
||||||
defer func() {
|
defer func() {
|
||||||
complete <- struct{}{}
|
complete <- struct{}{}
|
||||||
@ -556,6 +579,7 @@ func (op *BackupOperation) persistResults(
|
|||||||
// stores the operation details, results, and selectors in the backup manifest.
|
// stores the operation details, results, and selectors in the backup manifest.
|
||||||
func (op *BackupOperation) createBackupModels(
|
func (op *BackupOperation) createBackupModels(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
detailsStore detailsWriter,
|
||||||
snapID string,
|
snapID string,
|
||||||
backupDetails *details.Details,
|
backupDetails *details.Details,
|
||||||
) error {
|
) error {
|
||||||
@ -563,11 +587,7 @@ func (op *BackupOperation) createBackupModels(
|
|||||||
return errors.New("no backup details to record")
|
return errors.New("no backup details to record")
|
||||||
}
|
}
|
||||||
|
|
||||||
detailsID, err := streamstore.New(
|
detailsID, err := detailsStore.WriteBackupDetails(ctx, backupDetails)
|
||||||
op.kopia,
|
|
||||||
op.account.ID(),
|
|
||||||
op.Selectors.PathService(),
|
|
||||||
).WriteBackupDetails(ctx, backupDetails)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "creating backupdetails model")
|
return errors.Wrap(err, "creating backupdetails model")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -299,12 +299,12 @@ func (mbu mockBackuper) BackupCollections(
|
|||||||
service path.ServiceType,
|
service path.ServiceType,
|
||||||
oc *kopia.OwnersCats,
|
oc *kopia.OwnersCats,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
) (*kopia.BackupStats, *details.Details, map[string]path.Path, error) {
|
) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) {
|
||||||
if mbu.checkFunc != nil {
|
if mbu.checkFunc != nil {
|
||||||
mbu.checkFunc(bases, cs, service, oc, tags)
|
mbu.checkFunc(bases, cs, service, oc, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &kopia.BackupStats{}, &details.Details{}, nil, nil
|
return &kopia.BackupStats{}, &details.Builder{}, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *BackupOpSuite) TestBackupOperation_ConsumeBackupDataCollections_Paths() {
|
func (suite *BackupOpSuite) TestBackupOperation_ConsumeBackupDataCollections_Paths() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user