Find previous kopia snapshots for given set of tags (#1427)

## Description

Helper functions that allow finding the most recent complete
(and maybe incomplete) snapshots for a given set of tags. The
returned snapshots can later be used to do more efficient kopia
snapshots by allowing kopia to determine it's already uploaded
a file and skip uploading it again.

The number of most recent snapshots to return can be tuned (right
now it returns 1 most recent), but may cause more memory usage
during backups.

Kopia currently has some oddities when getting snapshot
manifests via tags:
* tag values cannot be empty or the comparison returns always
  true, selecting all snapshot manifests
* kopia does not currently tag snapshot manifests made during
  checkpoints. A patch to upstream kopia is needed to fix this.
  All other manifests that match the tags will be returned though

Added code is not currently connected to any backup logic.

Viewing by commit may help as there was a tad of lift'n'shift code
movement for the sake of organization

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

* #1404 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2022-12-01 11:07:44 -08:00 committed by GitHub
parent 2687ecf961
commit e9afbaf0ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 794 additions and 31 deletions

View File

@ -0,0 +1,242 @@
package kopia
import (
"context"
"sort"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
const (
// Kopia does not do comparisons properly for empty tags right now so add some
// placeholder value to them.
defaultTagValue = "0"
// Kopia CLI prefixes all user tags with "tag:"[1]. Maintaining this will
// ensure we don't accidentally take reserved tags and that tags can be
// displayed with kopia CLI.
// (permalinks)
// [1] https://github.com/kopia/kopia/blob/05e729a7858a6e86cb48ba29fb53cb6045efce2b/cli/command_snapshot_create.go#L169
userTagPrefix = "tag:"
)
type snapshotManager interface {
FindManifests(
ctx context.Context,
tags map[string]string,
) ([]*manifest.EntryMetadata, error)
LoadSnapshots(ctx context.Context, ids []manifest.ID) ([]*snapshot.Manifest, error)
}
type ownersCats struct {
resourceOwners map[string]struct{}
serviceCats map[string]struct{}
}
func serviceCatTag(p path.Path) string {
return p.Service().String() + p.Category().String()
}
func makeTagKV(k string) (string, string) {
return userTagPrefix + k, defaultTagValue
}
// tagsFromStrings returns a map[string]string with tags for all ownersCats
// passed in. Currently uses placeholder values for each tag because there can
// be multiple instances of resource owners and categories in a single snapshot.
func tagsFromStrings(oc *ownersCats) map[string]string {
res := make(map[string]string, len(oc.serviceCats)+len(oc.resourceOwners))
for k := range oc.serviceCats {
tk, tv := makeTagKV(k)
res[tk] = tv
}
for k := range oc.resourceOwners {
tk, tv := makeTagKV(k)
res[tk] = tv
}
return res
}
// getLastIdx searches for manifests contained in both foundMans and metas
// and returns the most recent complete manifest index. If no complete manifest
// is in both lists returns -1.
func getLastIdx(
foundMans map[manifest.ID]*snapshot.Manifest,
metas []*manifest.EntryMetadata,
) int {
// Minor optimization: the current code seems to return the entries from
// earliest timestamp to latest (this is undocumented). Sort in the same
// fashion so that we don't incur a bunch of swaps.
sort.Slice(metas, func(i, j int) bool {
return metas[i].ModTime.Before(metas[j].ModTime)
})
// Search newest to oldest.
for i := len(metas) - 1; i >= 0; i-- {
m := foundMans[metas[i].ID]
if m == nil || len(m.IncompleteReason) > 0 {
continue
}
return i
}
return -1
}
// manifestsSinceLastComplete searches through mans and returns the most recent
// complete manifest (if one exists) and maybe the most recent incomplete
// manifest. If the newest incomplete manifest is more recent than the newest
// complete manifest then adds it to the returned list. Otherwise no incomplete
// manifest is returned. Returns nil if there are no complete or incomplete
// manifests in mans.
func manifestsSinceLastComplete(
mans []*snapshot.Manifest,
) []*snapshot.Manifest {
var (
res []*snapshot.Manifest
foundIncomplete = false
)
// Manifests should maintain the sort order of the original IDs that were used
// to fetch the data, but just in case sort oldest to newest.
mans = snapshot.SortByTime(mans, false)
for i := len(mans) - 1; i >= 0; i-- {
m := mans[i]
if len(m.IncompleteReason) > 0 {
if !foundIncomplete {
foundIncomplete = true
res = append(res, m)
}
continue
}
// Once we find a complete snapshot we're done, even if we haven't
// found an incomplete one yet.
res = append(res, m)
break
}
return res
}
// fetchPrevManifests returns the most recent, as-of-yet unfound complete and
// (maybe) incomplete manifests in metas. If the most recent incomplete manifest
// is older than the most recent complete manifest no incomplete manifest is
// returned. If only incomplete manifests exists, returns the most recent one.
// Returns no manifests if an error occurs.
func fetchPrevManifests(
ctx context.Context,
sm snapshotManager,
foundMans map[manifest.ID]*snapshot.Manifest,
tags map[string]string,
) ([]*snapshot.Manifest, error) {
metas, err := sm.FindManifests(ctx, tags)
if err != nil {
return nil, errors.Wrap(err, "fetching manifest metas by tag")
}
if len(metas) == 0 {
return nil, nil
}
lastCompleteIdx := getLastIdx(foundMans, metas)
// We have a complete cached snapshot and it's the most recent. No need
// to do anything else.
if lastCompleteIdx == len(metas)-1 {
return nil, nil
}
// TODO(ashmrtn): Remainder of the function can be simplified if we can inject
// different tags to the snapshot checkpoints than the complete snapshot.
// Fetch all manifests newer than the oldest complete snapshot. A little
// wasteful as we may also re-fetch the most recent incomplete manifest, but
// it reduces the complexity of returning the most recent incomplete manifest
// if it is newer than the most recent complete manifest.
ids := make([]manifest.ID, 0, len(metas)-(lastCompleteIdx+1))
for i := lastCompleteIdx + 1; i < len(metas); i++ {
ids = append(ids, metas[i].ID)
}
mans, err := sm.LoadSnapshots(ctx, ids)
if err != nil {
return nil, errors.Wrap(err, "fetching previous manifests")
}
return manifestsSinceLastComplete(mans), nil
}
// fetchPrevSnapshotManifests returns a set of manifests for complete and maybe
// incomplete snapshots for the given (resource owner, service, category)
// tuples. Up to two manifests can be returned per tuple: one complete and one
// incomplete. An incomplete manifest may be returned if it is newer than the
// newest complete manifest for the tuple. Manifests are deduped such that if
// multiple tuples match the same manifest it will only be returned once.
//
// TODO(ashmrtn): Use to get previous manifests so backup can find previously
// uploaded versions of a file.
func fetchPrevSnapshotManifests(
ctx context.Context,
sm snapshotManager,
oc *ownersCats,
) []*snapshot.Manifest {
mans := map[manifest.ID]*snapshot.Manifest{}
// For each serviceCat/resource owner pair that we will be backing up, see if
// there's a previous incomplete snapshot and/or a previous complete snapshot
// we can pass in. Can be expanded to return more than the most recent
// snapshots, but may require more memory at runtime.
for serviceCat := range oc.serviceCats {
serviceTagKey, serviceTagValue := makeTagKV(serviceCat)
for resourceOwner := range oc.resourceOwners {
resourceOwnerTagKey, resourceOwnerTagValue := makeTagKV(resourceOwner)
tags := map[string]string{
serviceTagKey: serviceTagValue,
resourceOwnerTagKey: resourceOwnerTagValue,
}
found, err := fetchPrevManifests(ctx, sm, mans, tags)
if err != nil {
logger.Ctx(ctx).Warnw(
"fetching previous snapshot manifests for service/category/resource owner",
"error",
err,
"service/category",
serviceCat,
)
// Snapshot can still complete fine, just not as efficient.
continue
}
// If we found more recent snapshots then add them.
for _, m := range found {
mans[m.ID] = m
}
}
}
res := make([]*snapshot.Manifest, 0, len(mans))
for _, m := range mans {
res = append(res, m)
}
return res
}

View File

@ -0,0 +1,540 @@
package kopia
import (
"context"
"testing"
"time"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/path"
)
const (
testCompleteMan = false
testIncompleteMan = !testCompleteMan
)
var (
testT1 = time.Now()
testT2 = testT1.Add(1 * time.Hour)
testT3 = testT2.Add(1 * time.Hour)
testID1 = manifest.ID("snap1")
testID2 = manifest.ID("snap2")
testID3 = manifest.ID("snap3")
testMail = path.ExchangeService.String() + path.EmailCategory.String()
testEvents = path.ExchangeService.String() + path.EventsCategory.String()
testUser1 = "user1"
testUser2 = "user2"
testUser3 = "user3"
testAllUsersAllCats = &ownersCats{
resourceOwners: map[string]struct{}{
testUser1: {},
testUser2: {},
testUser3: {},
},
serviceCats: map[string]struct{}{
testMail: {},
testEvents: {},
},
}
testAllUsersMail = &ownersCats{
resourceOwners: map[string]struct{}{
testUser1: {},
testUser2: {},
testUser3: {},
},
serviceCats: map[string]struct{}{
testMail: {},
},
}
)
type manifestInfo struct {
// We don't currently use the values in the tags.
tags map[string]struct{}
metadata *manifest.EntryMetadata
man *snapshot.Manifest
}
func newManifestInfo(
id manifest.ID,
modTime time.Time,
incomplete bool,
tags ...string,
) manifestInfo {
incompleteStr := ""
if incomplete {
incompleteStr = "checkpoint"
}
structTags := make(map[string]struct{}, len(tags))
for _, t := range tags {
tk, _ := makeTagKV(t)
structTags[tk] = struct{}{}
}
return manifestInfo{
tags: structTags,
metadata: &manifest.EntryMetadata{
ID: id,
ModTime: modTime,
},
man: &snapshot.Manifest{
ID: id,
StartTime: fs.UTCTimestamp(modTime.UnixNano()),
IncompleteReason: incompleteStr,
},
}
}
type mockSnapshotManager struct {
data []manifestInfo
loadCallback func(ids []manifest.ID)
}
func matchesTags(mi manifestInfo, tags map[string]string) bool {
for k := range tags {
if _, ok := mi.tags[k]; !ok {
return false
}
}
return true
}
func (msm *mockSnapshotManager) FindManifests(
ctx context.Context,
tags map[string]string,
) ([]*manifest.EntryMetadata, error) {
if msm == nil {
return nil, assert.AnError
}
res := []*manifest.EntryMetadata{}
for _, mi := range msm.data {
if matchesTags(mi, tags) {
res = append(res, mi.metadata)
}
}
return res, nil
}
func (msm *mockSnapshotManager) LoadSnapshots(
ctx context.Context,
ids []manifest.ID,
) ([]*snapshot.Manifest, error) {
if msm == nil {
return nil, assert.AnError
}
// Allow checking set of IDs passed in.
if msm.loadCallback != nil {
msm.loadCallback(ids)
}
res := []*snapshot.Manifest{}
for _, id := range ids {
for _, mi := range msm.data {
if mi.man.ID == id {
res = append(res, mi.man)
}
}
}
return res, nil
}
type SnapshotFetchUnitSuite struct {
suite.Suite
}
func TestSnapshotFetchUnitSuite(t *testing.T) {
suite.Run(t, new(SnapshotFetchUnitSuite))
}
func (suite *SnapshotFetchUnitSuite) TestFetchPrevSnapshots() {
table := []struct {
name string
input *ownersCats
data []manifestInfo
// Use this to denote which manifests in data should be expected. Allows
// defining data in a table while not repeating things between data and
// expected.
expectedIdxs []int
// Expected number of times a manifest should try to be loaded from kopia.
// Used to check that caching is functioning properly.
expectedLoadCounts map[manifest.ID]int
}{
{
name: "AllOneSnapshot",
input: testAllUsersAllCats,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testCompleteMan,
testMail,
testEvents,
testUser1,
testUser2,
testUser3,
),
},
expectedIdxs: []int{0},
expectedLoadCounts: map[manifest.ID]int{
testID1: 1,
},
},
{
name: "SplitByCategory",
input: testAllUsersAllCats,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testCompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
newManifestInfo(
testID2,
testT2,
testCompleteMan,
testEvents,
testUser1,
testUser2,
testUser3,
),
},
expectedIdxs: []int{0, 1},
expectedLoadCounts: map[manifest.ID]int{
testID1: 1,
testID2: 1,
},
},
{
name: "IncompleteNewerThanComplete",
input: testAllUsersMail,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testCompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
newManifestInfo(
testID2,
testT2,
testIncompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
},
expectedIdxs: []int{0, 1},
expectedLoadCounts: map[manifest.ID]int{
testID1: 1,
testID2: 3,
},
},
{
name: "IncompleteOlderThanComplete",
input: testAllUsersMail,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testIncompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
newManifestInfo(
testID2,
testT2,
testCompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
},
expectedIdxs: []int{1},
expectedLoadCounts: map[manifest.ID]int{
testID1: 1,
testID2: 1,
},
},
{
name: "OnlyIncomplete",
input: testAllUsersMail,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testIncompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
},
expectedIdxs: []int{0},
expectedLoadCounts: map[manifest.ID]int{
testID1: 3,
},
},
{
name: "NewestComplete",
input: testAllUsersMail,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testCompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
newManifestInfo(
testID2,
testT2,
testCompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
},
expectedIdxs: []int{1},
expectedLoadCounts: map[manifest.ID]int{
testID1: 1,
testID2: 1,
},
},
{
name: "NewestIncomplete",
input: testAllUsersMail,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testIncompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
newManifestInfo(
testID2,
testT2,
testIncompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
},
expectedIdxs: []int{1},
expectedLoadCounts: map[manifest.ID]int{
testID1: 3,
testID2: 3,
},
},
{
name: "SomeCachedSomeNewer",
input: testAllUsersMail,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testCompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
newManifestInfo(
testID2,
testT2,
testCompleteMan,
testMail,
testUser3,
),
},
expectedIdxs: []int{0, 1},
expectedLoadCounts: map[manifest.ID]int{
testID1: 2,
testID2: 1,
},
},
{
name: "SomeCachedSomeNewerIncomplete",
input: testAllUsersMail,
data: []manifestInfo{
newManifestInfo(
testID1,
testT1,
testCompleteMan,
testMail,
testUser1,
testUser2,
testUser3,
),
newManifestInfo(
testID2,
testT2,
testIncompleteMan,
testMail,
testUser3,
),
},
expectedIdxs: []int{0, 1},
expectedLoadCounts: map[manifest.ID]int{
testID1: 1,
testID2: 1,
},
},
{
name: "NoMatches",
input: testAllUsersMail,
data: nil,
expectedIdxs: nil,
// Stop failure for nil-map comparison.
expectedLoadCounts: map[manifest.ID]int{},
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
msm := &mockSnapshotManager{
data: test.data,
}
loadCounts := map[manifest.ID]int{}
msm.loadCallback = func(ids []manifest.ID) {
for _, id := range ids {
loadCounts[id]++
}
}
snaps := fetchPrevSnapshotManifests(ctx, msm, test.input)
expected := make([]*snapshot.Manifest, 0, len(test.expectedIdxs))
for _, i := range test.expectedIdxs {
expected = append(expected, test.data[i].man)
}
assert.ElementsMatch(t, expected, snaps)
// Need to manually check because we don't know the order the
// user/service/category labels will be iterated over. For some tests this
// could cause more loads than the ideal case.
assert.Len(t, loadCounts, len(test.expectedLoadCounts))
for id, count := range loadCounts {
assert.GreaterOrEqual(t, test.expectedLoadCounts[id], count)
}
})
}
}
// mockErrorSnapshotManager returns an error the first time LoadSnapshot and
// FindSnapshot are called. After that it passes the calls through to the
// contained snapshotManager.
type mockErrorSnapshotManager struct {
retFindErr bool
retLoadErr bool
sm snapshotManager
}
func (msm *mockErrorSnapshotManager) FindManifests(
ctx context.Context,
tags map[string]string,
) ([]*manifest.EntryMetadata, error) {
if !msm.retFindErr {
msm.retFindErr = true
return nil, assert.AnError
}
return msm.sm.FindManifests(ctx, tags)
}
func (msm *mockErrorSnapshotManager) LoadSnapshots(
ctx context.Context,
ids []manifest.ID,
) ([]*snapshot.Manifest, error) {
if !msm.retLoadErr {
msm.retLoadErr = true
return nil, assert.AnError
}
return msm.sm.LoadSnapshots(ctx, ids)
}
func (suite *SnapshotFetchUnitSuite) TestFetchPrevSnapshotsWorksWithErrors() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
input := testAllUsersMail
mockData := []manifestInfo{
newManifestInfo(
testID1,
testT1,
testCompleteMan,
testMail,
testUser1,
),
newManifestInfo(
testID2,
testT2,
testCompleteMan,
testMail,
testUser2,
),
newManifestInfo(
testID3,
testT3,
testCompleteMan,
testMail,
testUser3,
),
}
expected := []*snapshot.Manifest{mockData[2].man}
msm := &mockErrorSnapshotManager{
sm: &mockSnapshotManager{
data: mockData,
},
}
snaps := fetchPrevSnapshotManifests(ctx, msm, input)
assert.ElementsMatch(t, expected, snaps)
}

View File

@ -522,32 +522,6 @@ func (w Wrapper) BackupCollections(
return s, progress.deets, nil
}
type ownersCats struct {
resourceOwners map[string]struct{}
serviceCats map[string]struct{}
}
func serviceCatTag(p path.Path) string {
return p.Service().String() + p.Category().String()
}
// tagsFromStrings returns a map[string]string with the union of both maps
// passed in. Currently uses empty values for each tag because there can be
// multiple instances of resource owners and categories in a single snapshot.
func tagsFromStrings(oc *ownersCats) map[string]string {
res := make(map[string]string, len(oc.serviceCats)+len(oc.resourceOwners))
for k := range oc.serviceCats {
res[k] = ""
}
for k := range oc.resourceOwners {
res[k] = ""
}
return res
}
func (w Wrapper) makeSnapshotWithRoot(
ctx context.Context,
root fs.Directory,

View File

@ -851,11 +851,18 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
42,
),
}
expectedTags := map[string]string{
serviceCatTag(suite.testPath1): "",
suite.testPath1.ResourceOwner(): "",
serviceCatTag(suite.testPath2): "",
suite.testPath2.ResourceOwner(): "",
baseTagKeys := []string{
serviceCatTag(suite.testPath1),
suite.testPath1.ResourceOwner(),
serviceCatTag(suite.testPath2),
suite.testPath2.ResourceOwner(),
}
expectedTags := map[string]string{}
for _, k := range baseTagKeys {
tk, tv := makeTagKV(k)
expectedTags[tk] = tv
}
stats, deets, err := suite.w.BackupCollections(suite.ctx, collections, path.ExchangeService)