Use more unique data in the kopia SourceInfo for snapshots (#5093)

Use unique info for the kopia snapshot source information to help
isolate snapshots for different resources from each other. This will
come in handy both for checkpoint snapshots and regular snapshots if
kopia's retention policy happens to look for things.

Integration tests for this probably won't pass until the next PR in
the series is merged into this PR

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [x] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

merge after:
* #5091
* #5092

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2024-01-23 18:46:57 -08:00 committed by GitHub
parent e45b3565ad
commit 5a05223e90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 342 additions and 44 deletions

View File

@ -4,6 +4,7 @@ run:
linters:
enable:
- errcheck
- exhaustive
- forbidigo
- gci
- gofmt
@ -25,6 +26,11 @@ linters:
- staticcheck
linters-settings:
exhaustive:
check:
- switch
default-signifies-exhaustive: false
explicit-exhaustive-switch: true
gci:
sections:
- standard

View File

@ -234,8 +234,8 @@ func (suite *WrapperIntegrationSuite) TestGetPolicyOrDefault_GetsDefault() {
}()
si := snapshot.SourceInfo{
Host: corsoHost,
UserName: corsoUser,
Host: "exchangeemail",
UserName: "tenantID-resourceID",
Path: "test-path-root",
}
@ -271,8 +271,8 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() {
// Check the global policy will be the effective policy in future snapshots
// for some source info.
si := snapshot.SourceInfo{
Host: corsoHost,
UserName: corsoUser,
Host: "exchangeemail",
UserName: "tenantID-resourceID",
Path: "test-path-root",
}

View File

@ -16,6 +16,7 @@ import (
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/kopia/kopia/snapshot/snapshotmaintenance"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/readers"
@ -33,13 +34,6 @@ import (
"github.com/alcionai/corso/src/pkg/store"
)
const (
// TODO(ashmrtnz): These should be some values from upper layer corso,
// possibly corresponding to who is making the backup.
corsoHost = "corso-host"
corsoUser = "corso"
)
// common manifest tags
const (
TagBackupID = "backup-id"
@ -202,24 +196,12 @@ func (w Wrapper) ConsumeBackupCollections(
return nil, nil, nil, clues.Wrap(err, "building kopia directories")
}
// Add some extra tags so we can look things up by reason.
tags := maps.Clone(additionalTags)
if tags == nil {
// Some platforms seem to return nil if the input is nil.
tags = map[string]string{}
}
for _, r := range backupReasons {
for _, k := range tagKeys(r) {
tags[k] = ""
}
}
s, err := w.makeSnapshotWithRoot(
ctx,
backupReasons,
assistBase,
dirTree,
tags,
additionalTags,
progress)
if err != nil {
return nil, nil, nil, err
@ -228,8 +210,72 @@ func (w Wrapper) ConsumeBackupCollections(
return s, progress.deets, progress.toMerge, progress.errs.Failure()
}
// userAndHost is used as a passing mechanism for values that will be fed into
// kopia's UserName and Host fields for SourceInfo. It exists to avoid returning
// two strings from the hostAndUserFromReasons function.
type userAndHost struct {
user string
host string
}
func hostAndUserFromReasons(reasons []identity.Reasoner) (userAndHost, error) {
var (
tenant string
resource string
// reasonMap is a hash set of the concatenation of the service and category.
reasonMap = map[string]struct{}{}
)
for i, reason := range reasons {
// Use a check on the iteration index instead of empty string so we can
// differentiate between the first iteration and a reason with an empty
// value (should result in an error if there's another reason with a
// non-empty value).
if i == 0 {
tenant = reason.Tenant()
} else if tenant != reason.Tenant() {
return userAndHost{}, clues.New("multiple tenant IDs in backup reasons").
With(
"old_tenant_id", tenant,
"new_tenant_id", reason.Tenant())
}
if i == 0 {
resource = reason.ProtectedResource()
} else if resource != reason.ProtectedResource() {
return userAndHost{}, clues.New("multiple protected resource IDs in backup reasons").
With(
"old_resource_id", resource,
"new_resource_id", reason.ProtectedResource())
}
dataType := reason.Service().String() + reason.Category().String()
reasonMap[dataType] = struct{}{}
}
allReasons := maps.Keys(reasonMap)
slices.Sort(allReasons)
host := strings.Join(allReasons, "-")
user := strings.Join([]string{tenant, resource}, "-")
if len(user) == 0 || user == "-" {
return userAndHost{}, clues.New("empty user value")
}
if len(host) == 0 {
return userAndHost{}, clues.New("empty host value")
}
return userAndHost{
host: host,
user: user,
}, nil
}
func (w Wrapper) makeSnapshotWithRoot(
ctx context.Context,
backupReasons []identity.Reasoner,
prevBases []BackupBase,
root fs.Directory,
addlTags map[string]string,
@ -252,11 +298,24 @@ func (w Wrapper) makeSnapshotWithRoot(
snapIDs = append(snapIDs, ent.ItemDataSnapshot.ID)
}
// Add some extra tags so we can look things up by reason.
allTags := maps.Clone(addlTags)
if allTags == nil {
// Some platforms seem to return nil if the input is nil.
allTags = map[string]string{}
}
for _, r := range backupReasons {
for _, k := range tagKeys(r) {
allTags[k] = ""
}
}
ctx = clues.Add(
ctx,
"num_assist_snapshots", len(prevBases),
"assist_snapshot_ids", snapIDs,
"additional_tags", addlTags)
"additional_tags", allTags)
if len(snapIDs) > 0 {
logger.Ctx(ctx).Info("using snapshots for kopia-assisted incrementals")
@ -266,7 +325,7 @@ func (w Wrapper) makeSnapshotWithRoot(
tags := map[string]string{}
for k, v := range addlTags {
for k, v := range allTags {
mk, mv := makeTagKV(k)
if len(v) == 0 {
@ -276,7 +335,16 @@ func (w Wrapper) makeSnapshotWithRoot(
tags[mk] = v
}
err := repo.WriteSession(
// Set the SourceInfo to the tenant ID, resource ID, and the concatenation
// of the service/data types being backed up. This will give us unique
// values for each set of backups with the assumption that no concurrent
// backups for the same set of things is being run on this repo.
userHost, err := hostAndUserFromReasons(backupReasons)
if err != nil {
return nil, clues.StackWC(ctx, err)
}
err = repo.WriteSession(
ctx,
w.c,
repo.WriteSessionOptions{
@ -288,10 +356,9 @@ func (w Wrapper) makeSnapshotWithRoot(
},
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
si := snapshot.SourceInfo{
Host: corsoHost,
UserName: corsoUser,
// TODO(ashmrtnz): will this be something useful for snapshot lookups later?
Path: root.Name(),
Host: userHost.host,
UserName: userHost.user,
Path: root.Name(),
}
trueVal := policy.OptionalBool(true)

View File

@ -3,6 +3,7 @@ package kopia
import (
"bytes"
"context"
"fmt"
"io"
stdpath "path"
"strings"
@ -375,6 +376,146 @@ func (suite *BasicKopiaIntegrationSuite) TestUpdatePersistentConfig() {
mutableParams.EpochParameters.MinEpochDuration)
}
func (suite *BasicKopiaIntegrationSuite) TestConsumeBackupCollections_SetsSourceInfo() {
table := []struct {
name string
reasons []identity.Reasoner
expectError assert.ErrorAssertionFunc
expectUser string
expectHost string
}{
{
name: "DifferentDataTypesInService",
reasons: []identity.Reasoner{
identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory),
identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory),
},
expectError: assert.NoError,
expectUser: testTenant + "-" + testUser,
expectHost: path.ExchangeService.String() +
path.ContactsCategory.String() + "-" + path.ExchangeService.String() +
path.EmailCategory.String(),
},
{
name: "DifferentServices",
reasons: []identity.Reasoner{
identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory),
identity.NewReason(testTenant, testUser, path.OneDriveService, path.FilesCategory),
},
expectError: assert.NoError,
expectUser: testTenant + "-" + testUser,
expectHost: path.ExchangeService.String() +
path.EmailCategory.String() + "-" + path.OneDriveService.String() +
path.FilesCategory.String(),
},
{
name: "EmptyTenant",
reasons: []identity.Reasoner{
identity.NewReason("", testUser, path.ExchangeService, path.EmailCategory),
},
expectError: assert.NoError,
expectUser: "-" + testUser,
expectHost: path.ExchangeService.String() + path.EmailCategory.String(),
},
{
name: "EmptyResource",
reasons: []identity.Reasoner{
identity.NewReason(testTenant, "", path.ExchangeService, path.EmailCategory),
},
expectError: assert.NoError,
expectUser: testTenant + "-",
expectHost: path.ExchangeService.String() + path.EmailCategory.String(),
},
{
name: "EmptyTenantAndResource Errors",
reasons: []identity.Reasoner{
identity.NewReason("", "", path.ExchangeService, path.EmailCategory),
},
expectError: assert.Error,
},
{
name: "EmptyAndPopulatedTenant Errors",
reasons: []identity.Reasoner{
identity.NewReason("", testUser, path.ExchangeService, path.EmailCategory),
identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory),
},
expectError: assert.Error,
},
{
name: "DifferentTenants Errors",
reasons: []identity.Reasoner{
identity.NewReason(testTenant+"1", testUser, path.ExchangeService, path.EmailCategory),
identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory),
},
expectError: assert.Error,
},
{
name: "DifferentResources Errors",
reasons: []identity.Reasoner{
identity.NewReason(testTenant, testUser+"1", path.ExchangeService, path.EmailCategory),
identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory),
},
expectError: assert.Error,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
var cols []data.BackupCollection
for i, reason := range test.reasons {
colPath, err := path.Build(
testTenant,
testUser,
reason.Service(),
reason.Category(),
false,
fmt.Sprintf("%d", i))
require.NoError(t, err, clues.ToCore(err))
cols = append(cols, exchMock.NewCollection(colPath, colPath, 0))
}
c, err := openLocalKopiaRepo(t, ctx)
require.NoError(t, err, clues.ToCore(err))
wrapper := &Wrapper{c}
defer wrapper.Close(ctx)
stats, _, _, err := wrapper.ConsumeBackupCollections(
ctx,
test.reasons,
nil,
cols,
nil,
nil,
true,
count.New(),
fault.New(true))
test.expectError(t, err, clues.ToCore(err))
if err != nil {
return
}
snap, err := snapshot.LoadSnapshot(
ctx,
wrapper.c,
manifest.ID(stats.SnapshotID))
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, test.expectHost, snap.Source.Host, "source host")
assert.Equal(t, test.expectUser, snap.Source.UserName, "source user")
})
}
}
// ---------------
// integration tests that require object locking to be enabled on the bucket.
// ---------------

View File

@ -305,8 +305,26 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
op.Errors.Fail(clues.Wrap(err, "running backup"))
}
// Not expected to fail here as we make a similar call in op.do but we do need
// to make sure we pass some sort of reason for persisting the streamstore
// data.
reasons, err := op.Selectors.Reasons(op.account.ID(), false)
if err != nil {
reasons = []identity.Reasoner{identity.NewReason(
op.account.ID(),
op.Selectors.ID(),
path.UnknownService,
path.UnknownCategory)}
}
LogFaultErrors(ctx, op.Errors.Errors(), "running backup")
op.doPersistence(ctx, &opStats, sstore, deets, startTime)
op.doPersistence(
ctx,
reasons,
&opStats,
sstore,
deets,
startTime)
finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup")
logger.Ctx(ctx).Infow(
@ -319,6 +337,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
func (op *BackupOperation) doPersistence(
ctx context.Context,
reasons []identity.Reasoner,
opStats *backupStats,
detailsStore streamstore.Streamer,
deets *details.Builder,
@ -334,6 +353,7 @@ func (op *BackupOperation) doPersistence(
err = op.createBackupModels(
ctx,
reasons,
detailsStore,
*opStats,
op.Results.BackupID,
@ -905,6 +925,7 @@ func (op *BackupOperation) persistResults(
// stores the operation details, results, and selectors in the backup manifest.
func (op *BackupOperation) createBackupModels(
ctx context.Context,
reasons []identity.Reasoner,
sscw streamstore.CollectorWriter,
opStats backupStats,
backupID model.StableID,
@ -948,7 +969,12 @@ func (op *BackupOperation) createBackupModels(
return clues.Wrap(err, "collecting errors for persistence")
}
ssid, err := sscw.Write(ctx, errs)
metadataReasons := make([]identity.Reasoner, 0, len(reasons))
for _, reason := range reasons {
metadataReasons = append(metadataReasons, reason.ToMetadata())
}
ssid, err := sscw.Write(ctx, metadataReasons, errs)
if err != nil {
return clues.Wrap(err, "persisting details and errors")
}

View File

@ -14,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
@ -177,7 +178,16 @@ func (suite *StreamStoreIntgSuite) TestStreamer() {
require.NoError(t, err)
}
snapid, err := ss.Write(ctx, fault.New(true))
snapid, err := ss.Write(
ctx,
[]identity.Reasoner{
identity.NewReason(
deetsPath.Tenant(),
deetsPath.ProtectedResource(),
path.ExchangeMetadataService,
deetsPath.Category()),
},
fault.New(true))
require.NoError(t, err)
test.hasSnapID(t, snapid)

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/src/internal/streamstore"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/fault"
)
@ -57,7 +58,11 @@ func (ms Streamer) Read(
return col.Unmr(io.NopCloser(bytes.NewReader(bs)))
}
func (ms Streamer) Write(context.Context, *fault.Bus) (string, error) {
func (ms Streamer) Write(
context.Context,
[]identity.Reasoner,
*fault.Bus,
) (string, error) {
return "", clues.New("not implemented")
}

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/kopia/inject"
"github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
@ -62,10 +63,14 @@ func (ss *storeStreamer) Collect(ctx context.Context, col Collectable) error {
}
// Write persists the collected objects in the stream store
func (ss *storeStreamer) Write(ctx context.Context, errs *fault.Bus) (string, error) {
func (ss *storeStreamer) Write(
ctx context.Context,
reasons []identity.Reasoner,
errs *fault.Bus,
) (string, error) {
ctx = clues.Add(ctx, "snapshot_type", "stream store")
id, err := write(ctx, ss.kw, ss.dbcs, errs)
id, err := write(ctx, ss.kw, reasons, ss.dbcs, errs)
if err != nil {
return "", clues.Wrap(err, "writing to stream store")
}
@ -109,7 +114,7 @@ type Reader interface {
}
type Writer interface {
Write(context.Context, *fault.Bus) (string, error)
Write(context.Context, []identity.Reasoner, *fault.Bus) (string, error)
}
// Marshallers are used to convert structs into bytes to be persisted in the store.
@ -203,6 +208,7 @@ func collect(
func write(
ctx context.Context,
bup inject.BackupConsumer,
reasons []identity.Reasoner,
dbcs []data.BackupCollection,
errs *fault.Bus,
) (string, error) {
@ -210,7 +216,7 @@ func write(
backupStats, _, _, err := bup.ConsumeBackupCollections(
ctx,
nil,
reasons,
nil,
dbcs,
prefixmatcher.NopReader[map[string]struct{}](),

View File

@ -209,6 +209,22 @@ func (suite *ServiceCategoryUnitSuite) TestToMetadata() {
input: GroupsService,
expect: GroupsMetadataService,
},
{
input: ExchangeMetadataService,
expect: ExchangeMetadataService,
},
{
input: OneDriveMetadataService,
expect: OneDriveMetadataService,
},
{
input: SharePointMetadataService,
expect: SharePointMetadataService,
},
{
input: GroupsMetadataService,
expect: GroupsMetadataService,
},
{
input: UnknownService,
expect: UnknownService,

View File

@ -76,6 +76,7 @@ func (svc ServiceType) HumanString() string {
}
func (svc ServiceType) ToMetadata() ServiceType {
//exhaustive:enforce
switch svc {
case ExchangeService:
return ExchangeMetadataService
@ -85,6 +86,17 @@ func (svc ServiceType) ToMetadata() ServiceType {
return SharePointMetadataService
case GroupsService:
return GroupsMetadataService
case ExchangeMetadataService:
fallthrough
case OneDriveMetadataService:
fallthrough
case SharePointMetadataService:
fallthrough
case GroupsMetadataService:
fallthrough
case UnknownService:
return svc
}
return UnknownService

View File

@ -803,13 +803,16 @@ func writeBackup(
sstore = streamstore.NewStreamer(kw, tID, serv)
)
err := sstore.Collect(ctx, streamstore.DetailsCollector(deets))
reasons, err := sel.Reasons(tID, false)
require.NoError(t, err, clues.ToCore(err))
err = sstore.Collect(ctx, streamstore.DetailsCollector(deets))
require.NoError(t, err, "collecting details in streamstore")
err = sstore.Collect(ctx, streamstore.FaultErrorsCollector(fe))
require.NoError(t, err, "collecting errors in streamstore")
ssid, err := sstore.Write(ctx, errs)
ssid, err := sstore.Write(ctx, reasons, errs)
require.NoError(t, err, "writing to streamstore")
tags := map[string]string{
@ -883,13 +886,16 @@ func (suite *RepositoryModelIntgSuite) TestGetBackupDetails() {
ctx, flush := tester.NewContext(t)
defer flush()
sel := selectors.NewExchangeBackup([]string{brunhilda})
sel.Include(sel.MailFolders(selectors.Any()))
b := writeBackup(
t,
ctx,
suite.kw,
suite.sw,
tenantID, "snapID", test.writeBupID,
selectors.NewExchangeBackup([]string{brunhilda}).Selector,
sel.Selector,
brunhilda, brunhilda,
test.deets,
&fault.Errors{},
@ -990,13 +996,16 @@ func (suite *RepositoryModelIntgSuite) TestGetBackupErrors() {
ctx, flush := tester.NewContext(t)
defer flush()
sel := selectors.NewExchangeBackup([]string{brunhilda})
sel.Include(sel.MailFolders(selectors.Any()))
b := writeBackup(
t,
ctx,
suite.kw,
suite.sw,
tenantID, "snapID", test.writeBupID,
selectors.NewExchangeBackup([]string{brunhilda}).Selector,
sel.Selector,
brunhilda, brunhilda,
test.deets,
test.errors,