Add new code for finding bases for incremental backups (#3502)
Add a new set of functions to find bases for incremental backups. The new set of functions return * set of backup models and the reasons for selecting each * set of snapshots to merge and the reasons for selecting each * set of snapshots for kopia-assisted incrementals and reasons for selecting each New code is not wired into existing system so no behavioral changes are expected --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No #### Type of change - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup #### Issue(s) * #3202 #### Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
264c1968ab
commit
9675c54c46
259
src/internal/kopia/base_finder.go
Normal file
259
src/internal/kopia/base_finder.go
Normal file
@ -0,0 +1,259 @@
|
|||||||
|
package kopia
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/alcionai/clues"
|
||||||
|
"github.com/kopia/kopia/repo/manifest"
|
||||||
|
"github.com/kopia/kopia/snapshot"
|
||||||
|
"golang.org/x/exp/maps"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/model"
|
||||||
|
"github.com/alcionai/corso/src/internal/operations/inject"
|
||||||
|
"github.com/alcionai/corso/src/pkg/backup"
|
||||||
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BackupBases struct {
|
||||||
|
Backups []BackupEntry
|
||||||
|
MergeBases []ManifestEntry
|
||||||
|
AssistBases []ManifestEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
type BackupEntry struct {
|
||||||
|
*backup.Backup
|
||||||
|
Reasons []Reason
|
||||||
|
}
|
||||||
|
|
||||||
|
type baseFinder struct {
|
||||||
|
sm snapshotManager
|
||||||
|
bg inject.GetBackuper
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBaseFinder(
|
||||||
|
sm snapshotManager,
|
||||||
|
bg inject.GetBackuper,
|
||||||
|
) (*baseFinder, error) {
|
||||||
|
if sm == nil {
|
||||||
|
return nil, clues.New("nil snapshotManager")
|
||||||
|
}
|
||||||
|
|
||||||
|
if bg == nil {
|
||||||
|
return nil, clues.New("nil GetBackuper")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &baseFinder{
|
||||||
|
sm: sm,
|
||||||
|
bg: bg,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *baseFinder) getBackupModel(
|
||||||
|
ctx context.Context,
|
||||||
|
man *snapshot.Manifest,
|
||||||
|
) (*backup.Backup, error) {
|
||||||
|
k, _ := makeTagKV(TagBackupID)
|
||||||
|
bID := man.Tags[k]
|
||||||
|
|
||||||
|
ctx = clues.Add(ctx, "search_backup_id", bID)
|
||||||
|
|
||||||
|
bup, err := b.bg.GetBackup(ctx, model.StableID(bID))
|
||||||
|
if err != nil {
|
||||||
|
return nil, clues.Stack(err).WithClues(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
return bup, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// findBasesInSet goes through manifest metadata entries and sees if they're
|
||||||
|
// incomplete or not. If an entry is incomplete and we don't already have a
|
||||||
|
// complete or incomplete manifest add it to the set for kopia assisted
|
||||||
|
// incrementals. If it's complete, fetch the backup model and see if it
|
||||||
|
// corresponds to a successful backup. If it does, return it as we only need the
|
||||||
|
// most recent complete backup as the base.
|
||||||
|
func (b *baseFinder) findBasesInSet(
|
||||||
|
ctx context.Context,
|
||||||
|
reason Reason,
|
||||||
|
metas []*manifest.EntryMetadata,
|
||||||
|
) (*BackupEntry, *ManifestEntry, []ManifestEntry, error) {
|
||||||
|
// Sort manifests by time so we can go through them sequentially. The code in
|
||||||
|
// kopia appears to sort them already, but add sorting here just so we're not
|
||||||
|
// reliant on undocumented behavior.
|
||||||
|
sort.Slice(metas, func(i, j int) bool {
|
||||||
|
return metas[i].ModTime.Before(metas[j].ModTime)
|
||||||
|
})
|
||||||
|
|
||||||
|
var (
|
||||||
|
kopiaAssistSnaps []ManifestEntry
|
||||||
|
foundIncomplete bool
|
||||||
|
)
|
||||||
|
|
||||||
|
for i := len(metas) - 1; i >= 0; i-- {
|
||||||
|
meta := metas[i]
|
||||||
|
ictx := clues.Add(ctx, "search_snapshot_id", meta.ID)
|
||||||
|
|
||||||
|
man, err := b.sm.LoadSnapshot(ictx, meta.ID)
|
||||||
|
if err != nil {
|
||||||
|
// Safe to continue here as we'll just end up attempting to use an older
|
||||||
|
// backup as the base.
|
||||||
|
logger.CtxErr(ictx, err).Info("attempting to get snapshot")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(man.IncompleteReason) > 0 {
|
||||||
|
if !foundIncomplete {
|
||||||
|
foundIncomplete = true
|
||||||
|
|
||||||
|
kopiaAssistSnaps = append(kopiaAssistSnaps, ManifestEntry{
|
||||||
|
Manifest: man,
|
||||||
|
Reasons: []Reason{reason},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a complete snapshot so see if we have a backup model for it.
|
||||||
|
bup, err := b.getBackupModel(ictx, man)
|
||||||
|
if err != nil {
|
||||||
|
// Safe to continue here as we'll just end up attempting to use an older
|
||||||
|
// backup as the base.
|
||||||
|
logger.CtxErr(ictx, err).Debug("searching for base backup")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ssid := bup.StreamStoreID
|
||||||
|
if len(ssid) == 0 {
|
||||||
|
ssid = bup.DetailsID
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ssid) == 0 {
|
||||||
|
logger.Ctx(ictx).Debugw(
|
||||||
|
"empty backup stream store ID",
|
||||||
|
"search_backup_id", bup.ID)
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we've made it to this point then we're considering the backup
|
||||||
|
// complete as it has both an item data snapshot and a backup details
|
||||||
|
// snapshot.
|
||||||
|
me := ManifestEntry{
|
||||||
|
Manifest: man,
|
||||||
|
Reasons: []Reason{reason},
|
||||||
|
}
|
||||||
|
kopiaAssistSnaps = append(kopiaAssistSnaps, me)
|
||||||
|
|
||||||
|
return &BackupEntry{
|
||||||
|
Backup: bup,
|
||||||
|
Reasons: []Reason{reason},
|
||||||
|
}, &me, kopiaAssistSnaps, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Ctx(ctx).Info("no base backups for reason")
|
||||||
|
|
||||||
|
return nil, nil, kopiaAssistSnaps, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *baseFinder) getBase(
|
||||||
|
ctx context.Context,
|
||||||
|
reason Reason,
|
||||||
|
tags map[string]string,
|
||||||
|
) (*BackupEntry, *ManifestEntry, []ManifestEntry, error) {
|
||||||
|
allTags := map[string]string{}
|
||||||
|
|
||||||
|
for _, k := range reason.TagKeys() {
|
||||||
|
allTags[k] = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
maps.Copy(allTags, tags)
|
||||||
|
allTags = normalizeTagKVs(allTags)
|
||||||
|
|
||||||
|
metas, err := b.sm.FindManifests(ctx, allTags)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, clues.Wrap(err, "getting snapshots")
|
||||||
|
}
|
||||||
|
|
||||||
|
// No snapshots means no backups so we can just exit here.
|
||||||
|
if len(metas) == 0 {
|
||||||
|
return nil, nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.findBasesInSet(ctx, reason, metas)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *baseFinder) FindBases(
|
||||||
|
ctx context.Context,
|
||||||
|
reasons []Reason,
|
||||||
|
tags map[string]string,
|
||||||
|
) (BackupBases, error) {
|
||||||
|
var (
|
||||||
|
// All maps go from ID -> entry. We need to track by ID so we can coalesce
|
||||||
|
// the reason for selecting something. Kopia assisted snapshots also use
|
||||||
|
// ManifestEntry so we have the reasons for selecting them to aid in
|
||||||
|
// debugging.
|
||||||
|
baseBups = map[model.StableID]BackupEntry{}
|
||||||
|
baseSnaps = map[manifest.ID]ManifestEntry{}
|
||||||
|
kopiaAssistSnaps = map[manifest.ID]ManifestEntry{}
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, reason := range reasons {
|
||||||
|
ictx := clues.Add(
|
||||||
|
ctx,
|
||||||
|
"search_service", reason.Service.String(),
|
||||||
|
"search_category", reason.Category.String())
|
||||||
|
logger.Ctx(ictx).Info("searching for previous manifests")
|
||||||
|
|
||||||
|
baseBackup, baseSnap, assistSnaps, err := b.getBase(ictx, reason, tags)
|
||||||
|
if err != nil {
|
||||||
|
logger.Ctx(ctx).Info(
|
||||||
|
"getting base, falling back to full backup for reason",
|
||||||
|
"error", err)
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if baseBackup != nil {
|
||||||
|
bs, ok := baseBups[baseBackup.ID]
|
||||||
|
if ok {
|
||||||
|
bs.Reasons = append(bs.Reasons, baseSnap.Reasons...)
|
||||||
|
} else {
|
||||||
|
bs = *baseBackup
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reassign since it's structs not pointers to structs.
|
||||||
|
baseBups[baseBackup.ID] = bs
|
||||||
|
}
|
||||||
|
|
||||||
|
if baseSnap != nil {
|
||||||
|
bs, ok := baseSnaps[baseSnap.ID]
|
||||||
|
if ok {
|
||||||
|
bs.Reasons = append(bs.Reasons, baseSnap.Reasons...)
|
||||||
|
} else {
|
||||||
|
bs = *baseSnap
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reassign since it's structs not pointers to structs.
|
||||||
|
baseSnaps[baseSnap.ID] = bs
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range assistSnaps {
|
||||||
|
bs, ok := kopiaAssistSnaps[s.ID]
|
||||||
|
if ok {
|
||||||
|
bs.Reasons = append(bs.Reasons, s.Reasons...)
|
||||||
|
} else {
|
||||||
|
bs = s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reassign since it's structs not pointers to structs.
|
||||||
|
kopiaAssistSnaps[s.ID] = bs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return BackupBases{
|
||||||
|
Backups: maps.Values(baseBups),
|
||||||
|
MergeBases: maps.Values(baseSnaps),
|
||||||
|
AssistBases: maps.Values(kopiaAssistSnaps),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
1020
src/internal/kopia/base_finder_test.go
Normal file
1020
src/internal/kopia/base_finder_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@ -406,6 +406,18 @@ func checkCompressor(compressor compression.Name) error {
|
|||||||
return clues.Stack(clues.New("unknown compressor type"), clues.New(string(compressor)))
|
return clues.Stack(clues.New("unknown compressor type"), clues.New(string(compressor)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *conn) LoadSnapshot(
|
||||||
|
ctx context.Context,
|
||||||
|
id manifest.ID,
|
||||||
|
) (*snapshot.Manifest, error) {
|
||||||
|
man, err := snapshot.LoadSnapshot(ctx, w.Repository, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, clues.Stack(err).WithClues(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
return man, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (w *conn) LoadSnapshots(
|
func (w *conn) LoadSnapshots(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
ids []manifest.ID,
|
ids []manifest.ID,
|
||||||
|
|||||||
@ -33,4 +33,12 @@ type (
|
|||||||
errs *fault.Bus,
|
errs *fault.Bus,
|
||||||
) ([]data.RestoreCollection, error)
|
) ([]data.RestoreCollection, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BaseFinder interface {
|
||||||
|
FindBases(
|
||||||
|
ctx context.Context,
|
||||||
|
reasons []kopia.Reason,
|
||||||
|
tags map[string]string,
|
||||||
|
) (kopia.BackupBases, error)
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|||||||
@ -68,6 +68,8 @@ type snapshotManager interface {
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
tags map[string]string,
|
tags map[string]string,
|
||||||
) ([]*manifest.EntryMetadata, error)
|
) ([]*manifest.EntryMetadata, error)
|
||||||
|
LoadSnapshot(ctx context.Context, id manifest.ID) (*snapshot.Manifest, error)
|
||||||
|
// TODO(ashmrtn): Remove this when we switch to the new BaseFinder.
|
||||||
LoadSnapshots(ctx context.Context, ids []manifest.ID) ([]*snapshot.Manifest, error)
|
LoadSnapshots(ctx context.Context, ids []manifest.ID) ([]*snapshot.Manifest, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/alcionai/clues"
|
||||||
"github.com/kopia/kopia/fs"
|
"github.com/kopia/kopia/fs"
|
||||||
"github.com/kopia/kopia/repo/manifest"
|
"github.com/kopia/kopia/repo/manifest"
|
||||||
"github.com/kopia/kopia/snapshot"
|
"github.com/kopia/kopia/snapshot"
|
||||||
@ -15,85 +16,6 @@ import (
|
|||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
testCompleteMan = false
|
|
||||||
testIncompleteMan = !testCompleteMan
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
testT1 = time.Now()
|
|
||||||
testT2 = testT1.Add(1 * time.Hour)
|
|
||||||
testT3 = testT2.Add(1 * time.Hour)
|
|
||||||
|
|
||||||
testID1 = manifest.ID("snap1")
|
|
||||||
testID2 = manifest.ID("snap2")
|
|
||||||
testID3 = manifest.ID("snap3")
|
|
||||||
|
|
||||||
testMail = path.ExchangeService.String() + path.EmailCategory.String()
|
|
||||||
testEvents = path.ExchangeService.String() + path.EventsCategory.String()
|
|
||||||
|
|
||||||
testUser1 = "user1"
|
|
||||||
testUser2 = "user2"
|
|
||||||
testUser3 = "user3"
|
|
||||||
|
|
||||||
testAllUsersAllCats = []Reason{
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser1,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EmailCategory,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser1,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EventsCategory,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser2,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EmailCategory,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser2,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EventsCategory,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser3,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EmailCategory,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser3,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EventsCategory,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
testAllUsersMail = []Reason{
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser1,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EmailCategory,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser2,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EmailCategory,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ResourceOwner: testUser3,
|
|
||||||
Service: path.ExchangeService,
|
|
||||||
Category: path.EmailCategory,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
type manifestInfo struct {
|
|
||||||
// We don't currently use the values in the tags.
|
|
||||||
tags map[string]struct{}
|
|
||||||
metadata *manifest.EntryMetadata
|
|
||||||
man *snapshot.Manifest
|
|
||||||
}
|
|
||||||
|
|
||||||
func newManifestInfo(
|
func newManifestInfo(
|
||||||
id manifest.ID,
|
id manifest.ID,
|
||||||
modTime time.Time,
|
modTime time.Time,
|
||||||
@ -105,11 +27,11 @@ func newManifestInfo(
|
|||||||
incompleteStr = "checkpoint"
|
incompleteStr = "checkpoint"
|
||||||
}
|
}
|
||||||
|
|
||||||
structTags := make(map[string]struct{}, len(tags))
|
structTags := make(map[string]string, len(tags))
|
||||||
|
|
||||||
for _, t := range tags {
|
for _, t := range tags {
|
||||||
tk, _ := makeTagKV(t)
|
tk, _ := makeTagKV(t)
|
||||||
structTags[tk] = struct{}{}
|
structTags[tk] = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
return manifestInfo{
|
return manifestInfo{
|
||||||
@ -186,6 +108,13 @@ func (msm *mockSnapshotManager) LoadSnapshots(
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (msm *mockSnapshotManager) LoadSnapshot(
|
||||||
|
ctx context.Context,
|
||||||
|
id manifest.ID,
|
||||||
|
) (*snapshot.Manifest, error) {
|
||||||
|
return nil, clues.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
type SnapshotFetchUnitSuite struct {
|
type SnapshotFetchUnitSuite struct {
|
||||||
tester.Suite
|
tester.Suite
|
||||||
}
|
}
|
||||||
@ -951,6 +880,13 @@ func (msm *mockErrorSnapshotManager) LoadSnapshots(
|
|||||||
return msm.sm.LoadSnapshots(ctx, ids)
|
return msm.sm.LoadSnapshots(ctx, ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (msm *mockErrorSnapshotManager) LoadSnapshot(
|
||||||
|
ctx context.Context,
|
||||||
|
id manifest.ID,
|
||||||
|
) (*snapshot.Manifest, error) {
|
||||||
|
return nil, clues.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *SnapshotFetchUnitSuite) TestFetchPrevSnapshots_withErrors() {
|
func (suite *SnapshotFetchUnitSuite) TestFetchPrevSnapshots_withErrors() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
|
|||||||
@ -6,7 +6,9 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/common/idname"
|
"github.com/alcionai/corso/src/internal/common/idname"
|
||||||
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
|
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/internal/model"
|
||||||
"github.com/alcionai/corso/src/pkg/account"
|
"github.com/alcionai/corso/src/pkg/account"
|
||||||
|
"github.com/alcionai/corso/src/pkg/backup"
|
||||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
"github.com/alcionai/corso/src/pkg/control/repository"
|
"github.com/alcionai/corso/src/pkg/control/repository"
|
||||||
@ -47,4 +49,11 @@ type (
|
|||||||
RepoMaintenancer interface {
|
RepoMaintenancer interface {
|
||||||
RepoMaintenance(ctx context.Context, opts repository.Maintenance) error
|
RepoMaintenance(ctx context.Context, opts repository.Maintenance) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
GetBackuper interface {
|
||||||
|
GetBackup(
|
||||||
|
ctx context.Context,
|
||||||
|
backupID model.StableID,
|
||||||
|
) (*backup.Backup, error)
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user