From 5a05223e90789517b05a1b2d8f5bfb1fabbd3d34 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Tue, 23 Jan 2024 18:46:57 -0800 Subject: [PATCH] Use more unique data in the kopia SourceInfo for snapshots (#5093) Use unique info for the kopia snapshot source information to help isolate snapshots for different resources from each other. This will come in handy both for checkpoint snapshots and regular snapshots if kopia's retention policy happens to look for things. Integration tests for this probably won't pass until the next PR in the series is merged into this PR --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [x] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) merge after: * #5091 * #5092 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/.golangci.yml | 6 + src/internal/kopia/conn_test.go | 8 +- src/internal/kopia/wrapper.go | 123 +++++++++++---- src/internal/kopia/wrapper_test.go | 141 ++++++++++++++++++ src/internal/operations/backup.go | 30 +++- src/internal/streamstore/collectables_test.go | 12 +- src/internal/streamstore/mock/mock.go | 7 +- src/internal/streamstore/streamstore.go | 14 +- src/pkg/path/service_category_test.go | 16 ++ src/pkg/path/service_type.go | 12 ++ .../repository/repository_unexported_test.go | 17 ++- 11 files changed, 342 insertions(+), 44 deletions(-) diff --git a/src/.golangci.yml b/src/.golangci.yml index 38e093fdf..470ae3907 100644 --- a/src/.golangci.yml +++ b/src/.golangci.yml @@ -4,6 +4,7 @@ run: linters: enable: - errcheck + - exhaustive - forbidigo - gci - gofmt @@ -25,6 +26,11 @@ linters: - staticcheck linters-settings: + exhaustive: + check: + - switch + default-signifies-exhaustive: false + explicit-exhaustive-switch: true gci: sections: - standard diff --git a/src/internal/kopia/conn_test.go b/src/internal/kopia/conn_test.go index 973f28e2c..b3dac0917 100644 --- a/src/internal/kopia/conn_test.go +++ b/src/internal/kopia/conn_test.go @@ -234,8 +234,8 @@ func (suite *WrapperIntegrationSuite) TestGetPolicyOrDefault_GetsDefault() { }() si := snapshot.SourceInfo{ - Host: corsoHost, - UserName: corsoUser, + Host: "exchangeemail", + UserName: "tenantID-resourceID", Path: "test-path-root", } @@ -271,8 +271,8 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() { // Check the global policy will be the effective policy in future snapshots // for some source info. si := snapshot.SourceInfo{ - Host: corsoHost, - UserName: corsoUser, + Host: "exchangeemail", + UserName: "tenantID-resourceID", Path: "test-path-root", } diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 03a4bc888..592fcbdf6 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -16,6 +16,7 @@ import ( "github.com/kopia/kopia/snapshot/snapshotfs" "github.com/kopia/kopia/snapshot/snapshotmaintenance" "golang.org/x/exp/maps" + "golang.org/x/exp/slices" "github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/readers" @@ -33,13 +34,6 @@ import ( "github.com/alcionai/corso/src/pkg/store" ) -const ( - // TODO(ashmrtnz): These should be some values from upper layer corso, - // possibly corresponding to who is making the backup. - corsoHost = "corso-host" - corsoUser = "corso" -) - // common manifest tags const ( TagBackupID = "backup-id" @@ -202,24 +196,12 @@ func (w Wrapper) ConsumeBackupCollections( return nil, nil, nil, clues.Wrap(err, "building kopia directories") } - // Add some extra tags so we can look things up by reason. - tags := maps.Clone(additionalTags) - if tags == nil { - // Some platforms seem to return nil if the input is nil. - tags = map[string]string{} - } - - for _, r := range backupReasons { - for _, k := range tagKeys(r) { - tags[k] = "" - } - } - s, err := w.makeSnapshotWithRoot( ctx, + backupReasons, assistBase, dirTree, - tags, + additionalTags, progress) if err != nil { return nil, nil, nil, err @@ -228,8 +210,72 @@ func (w Wrapper) ConsumeBackupCollections( return s, progress.deets, progress.toMerge, progress.errs.Failure() } +// userAndHost is used as a passing mechanism for values that will be fed into +// kopia's UserName and Host fields for SourceInfo. It exists to avoid returning +// two strings from the hostAndUserFromReasons function. +type userAndHost struct { + user string + host string +} + +func hostAndUserFromReasons(reasons []identity.Reasoner) (userAndHost, error) { + var ( + tenant string + resource string + // reasonMap is a hash set of the concatenation of the service and category. + reasonMap = map[string]struct{}{} + ) + + for i, reason := range reasons { + // Use a check on the iteration index instead of empty string so we can + // differentiate between the first iteration and a reason with an empty + // value (should result in an error if there's another reason with a + // non-empty value). + if i == 0 { + tenant = reason.Tenant() + } else if tenant != reason.Tenant() { + return userAndHost{}, clues.New("multiple tenant IDs in backup reasons"). + With( + "old_tenant_id", tenant, + "new_tenant_id", reason.Tenant()) + } + + if i == 0 { + resource = reason.ProtectedResource() + } else if resource != reason.ProtectedResource() { + return userAndHost{}, clues.New("multiple protected resource IDs in backup reasons"). + With( + "old_resource_id", resource, + "new_resource_id", reason.ProtectedResource()) + } + + dataType := reason.Service().String() + reason.Category().String() + reasonMap[dataType] = struct{}{} + } + + allReasons := maps.Keys(reasonMap) + slices.Sort(allReasons) + + host := strings.Join(allReasons, "-") + user := strings.Join([]string{tenant, resource}, "-") + + if len(user) == 0 || user == "-" { + return userAndHost{}, clues.New("empty user value") + } + + if len(host) == 0 { + return userAndHost{}, clues.New("empty host value") + } + + return userAndHost{ + host: host, + user: user, + }, nil +} + func (w Wrapper) makeSnapshotWithRoot( ctx context.Context, + backupReasons []identity.Reasoner, prevBases []BackupBase, root fs.Directory, addlTags map[string]string, @@ -252,11 +298,24 @@ func (w Wrapper) makeSnapshotWithRoot( snapIDs = append(snapIDs, ent.ItemDataSnapshot.ID) } + // Add some extra tags so we can look things up by reason. + allTags := maps.Clone(addlTags) + if allTags == nil { + // Some platforms seem to return nil if the input is nil. + allTags = map[string]string{} + } + + for _, r := range backupReasons { + for _, k := range tagKeys(r) { + allTags[k] = "" + } + } + ctx = clues.Add( ctx, "num_assist_snapshots", len(prevBases), "assist_snapshot_ids", snapIDs, - "additional_tags", addlTags) + "additional_tags", allTags) if len(snapIDs) > 0 { logger.Ctx(ctx).Info("using snapshots for kopia-assisted incrementals") @@ -266,7 +325,7 @@ func (w Wrapper) makeSnapshotWithRoot( tags := map[string]string{} - for k, v := range addlTags { + for k, v := range allTags { mk, mv := makeTagKV(k) if len(v) == 0 { @@ -276,7 +335,16 @@ func (w Wrapper) makeSnapshotWithRoot( tags[mk] = v } - err := repo.WriteSession( + // Set the SourceInfo to the tenant ID, resource ID, and the concatenation + // of the service/data types being backed up. This will give us unique + // values for each set of backups with the assumption that no concurrent + // backups for the same set of things is being run on this repo. + userHost, err := hostAndUserFromReasons(backupReasons) + if err != nil { + return nil, clues.StackWC(ctx, err) + } + + err = repo.WriteSession( ctx, w.c, repo.WriteSessionOptions{ @@ -288,10 +356,9 @@ func (w Wrapper) makeSnapshotWithRoot( }, func(innerCtx context.Context, rw repo.RepositoryWriter) error { si := snapshot.SourceInfo{ - Host: corsoHost, - UserName: corsoUser, - // TODO(ashmrtnz): will this be something useful for snapshot lookups later? - Path: root.Name(), + Host: userHost.host, + UserName: userHost.user, + Path: root.Name(), } trueVal := policy.OptionalBool(true) diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index a1a9852e7..922305f89 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -3,6 +3,7 @@ package kopia import ( "bytes" "context" + "fmt" "io" stdpath "path" "strings" @@ -375,6 +376,146 @@ func (suite *BasicKopiaIntegrationSuite) TestUpdatePersistentConfig() { mutableParams.EpochParameters.MinEpochDuration) } +func (suite *BasicKopiaIntegrationSuite) TestConsumeBackupCollections_SetsSourceInfo() { + table := []struct { + name string + reasons []identity.Reasoner + expectError assert.ErrorAssertionFunc + expectUser string + expectHost string + }{ + { + name: "DifferentDataTypesInService", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory), + }, + expectError: assert.NoError, + expectUser: testTenant + "-" + testUser, + expectHost: path.ExchangeService.String() + + path.ContactsCategory.String() + "-" + path.ExchangeService.String() + + path.EmailCategory.String(), + }, + { + name: "DifferentServices", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.OneDriveService, path.FilesCategory), + }, + expectError: assert.NoError, + expectUser: testTenant + "-" + testUser, + expectHost: path.ExchangeService.String() + + path.EmailCategory.String() + "-" + path.OneDriveService.String() + + path.FilesCategory.String(), + }, + { + name: "EmptyTenant", + reasons: []identity.Reasoner{ + identity.NewReason("", testUser, path.ExchangeService, path.EmailCategory), + }, + expectError: assert.NoError, + expectUser: "-" + testUser, + expectHost: path.ExchangeService.String() + path.EmailCategory.String(), + }, + { + name: "EmptyResource", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant, "", path.ExchangeService, path.EmailCategory), + }, + expectError: assert.NoError, + expectUser: testTenant + "-", + expectHost: path.ExchangeService.String() + path.EmailCategory.String(), + }, + { + name: "EmptyTenantAndResource Errors", + reasons: []identity.Reasoner{ + identity.NewReason("", "", path.ExchangeService, path.EmailCategory), + }, + expectError: assert.Error, + }, + { + name: "EmptyAndPopulatedTenant Errors", + reasons: []identity.Reasoner{ + identity.NewReason("", testUser, path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory), + }, + expectError: assert.Error, + }, + { + name: "DifferentTenants Errors", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant+"1", testUser, path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory), + }, + expectError: assert.Error, + }, + { + name: "DifferentResources Errors", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant, testUser+"1", path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory), + }, + expectError: assert.Error, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + var cols []data.BackupCollection + + for i, reason := range test.reasons { + colPath, err := path.Build( + testTenant, + testUser, + reason.Service(), + reason.Category(), + false, + fmt.Sprintf("%d", i)) + require.NoError(t, err, clues.ToCore(err)) + + cols = append(cols, exchMock.NewCollection(colPath, colPath, 0)) + } + + c, err := openLocalKopiaRepo(t, ctx) + require.NoError(t, err, clues.ToCore(err)) + + wrapper := &Wrapper{c} + + defer wrapper.Close(ctx) + + stats, _, _, err := wrapper.ConsumeBackupCollections( + ctx, + test.reasons, + nil, + cols, + nil, + nil, + true, + count.New(), + fault.New(true)) + test.expectError(t, err, clues.ToCore(err)) + + if err != nil { + return + } + + snap, err := snapshot.LoadSnapshot( + ctx, + wrapper.c, + manifest.ID(stats.SnapshotID)) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, test.expectHost, snap.Source.Host, "source host") + assert.Equal(t, test.expectUser, snap.Source.UserName, "source user") + }) + } +} + // --------------- // integration tests that require object locking to be enabled on the bucket. // --------------- diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index b800fc1f2..a18e121ab 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -305,8 +305,26 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { op.Errors.Fail(clues.Wrap(err, "running backup")) } + // Not expected to fail here as we make a similar call in op.do but we do need + // to make sure we pass some sort of reason for persisting the streamstore + // data. + reasons, err := op.Selectors.Reasons(op.account.ID(), false) + if err != nil { + reasons = []identity.Reasoner{identity.NewReason( + op.account.ID(), + op.Selectors.ID(), + path.UnknownService, + path.UnknownCategory)} + } + LogFaultErrors(ctx, op.Errors.Errors(), "running backup") - op.doPersistence(ctx, &opStats, sstore, deets, startTime) + op.doPersistence( + ctx, + reasons, + &opStats, + sstore, + deets, + startTime) finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup") logger.Ctx(ctx).Infow( @@ -319,6 +337,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { func (op *BackupOperation) doPersistence( ctx context.Context, + reasons []identity.Reasoner, opStats *backupStats, detailsStore streamstore.Streamer, deets *details.Builder, @@ -334,6 +353,7 @@ func (op *BackupOperation) doPersistence( err = op.createBackupModels( ctx, + reasons, detailsStore, *opStats, op.Results.BackupID, @@ -905,6 +925,7 @@ func (op *BackupOperation) persistResults( // stores the operation details, results, and selectors in the backup manifest. func (op *BackupOperation) createBackupModels( ctx context.Context, + reasons []identity.Reasoner, sscw streamstore.CollectorWriter, opStats backupStats, backupID model.StableID, @@ -948,7 +969,12 @@ func (op *BackupOperation) createBackupModels( return clues.Wrap(err, "collecting errors for persistence") } - ssid, err := sscw.Write(ctx, errs) + metadataReasons := make([]identity.Reasoner, 0, len(reasons)) + for _, reason := range reasons { + metadataReasons = append(metadataReasons, reason.ToMetadata()) + } + + ssid, err := sscw.Write(ctx, metadataReasons, errs) if err != nil { return clues.Wrap(err, "persisting details and errors") } diff --git a/src/internal/streamstore/collectables_test.go b/src/internal/streamstore/collectables_test.go index 848e0642f..74e95f31a 100644 --- a/src/internal/streamstore/collectables_test.go +++ b/src/internal/streamstore/collectables_test.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/control/repository" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -177,7 +178,16 @@ func (suite *StreamStoreIntgSuite) TestStreamer() { require.NoError(t, err) } - snapid, err := ss.Write(ctx, fault.New(true)) + snapid, err := ss.Write( + ctx, + []identity.Reasoner{ + identity.NewReason( + deetsPath.Tenant(), + deetsPath.ProtectedResource(), + path.ExchangeMetadataService, + deetsPath.Category()), + }, + fault.New(true)) require.NoError(t, err) test.hasSnapID(t, snapid) diff --git a/src/internal/streamstore/mock/mock.go b/src/internal/streamstore/mock/mock.go index 5b4cf7ac1..270fb5267 100644 --- a/src/internal/streamstore/mock/mock.go +++ b/src/internal/streamstore/mock/mock.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/corso/src/internal/streamstore" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/fault" ) @@ -57,7 +58,11 @@ func (ms Streamer) Read( return col.Unmr(io.NopCloser(bytes.NewReader(bs))) } -func (ms Streamer) Write(context.Context, *fault.Bus) (string, error) { +func (ms Streamer) Write( + context.Context, + []identity.Reasoner, + *fault.Bus, +) (string, error) { return "", clues.New("not implemented") } diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index a0686c0c3..9811ab442 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/kopia/inject" "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -62,10 +63,14 @@ func (ss *storeStreamer) Collect(ctx context.Context, col Collectable) error { } // Write persists the collected objects in the stream store -func (ss *storeStreamer) Write(ctx context.Context, errs *fault.Bus) (string, error) { +func (ss *storeStreamer) Write( + ctx context.Context, + reasons []identity.Reasoner, + errs *fault.Bus, +) (string, error) { ctx = clues.Add(ctx, "snapshot_type", "stream store") - id, err := write(ctx, ss.kw, ss.dbcs, errs) + id, err := write(ctx, ss.kw, reasons, ss.dbcs, errs) if err != nil { return "", clues.Wrap(err, "writing to stream store") } @@ -109,7 +114,7 @@ type Reader interface { } type Writer interface { - Write(context.Context, *fault.Bus) (string, error) + Write(context.Context, []identity.Reasoner, *fault.Bus) (string, error) } // Marshallers are used to convert structs into bytes to be persisted in the store. @@ -203,6 +208,7 @@ func collect( func write( ctx context.Context, bup inject.BackupConsumer, + reasons []identity.Reasoner, dbcs []data.BackupCollection, errs *fault.Bus, ) (string, error) { @@ -210,7 +216,7 @@ func write( backupStats, _, _, err := bup.ConsumeBackupCollections( ctx, - nil, + reasons, nil, dbcs, prefixmatcher.NopReader[map[string]struct{}](), diff --git a/src/pkg/path/service_category_test.go b/src/pkg/path/service_category_test.go index ceffe2f3c..789e1a2f4 100644 --- a/src/pkg/path/service_category_test.go +++ b/src/pkg/path/service_category_test.go @@ -209,6 +209,22 @@ func (suite *ServiceCategoryUnitSuite) TestToMetadata() { input: GroupsService, expect: GroupsMetadataService, }, + { + input: ExchangeMetadataService, + expect: ExchangeMetadataService, + }, + { + input: OneDriveMetadataService, + expect: OneDriveMetadataService, + }, + { + input: SharePointMetadataService, + expect: SharePointMetadataService, + }, + { + input: GroupsMetadataService, + expect: GroupsMetadataService, + }, { input: UnknownService, expect: UnknownService, diff --git a/src/pkg/path/service_type.go b/src/pkg/path/service_type.go index 1c9e2b73a..b68e60ad6 100644 --- a/src/pkg/path/service_type.go +++ b/src/pkg/path/service_type.go @@ -76,6 +76,7 @@ func (svc ServiceType) HumanString() string { } func (svc ServiceType) ToMetadata() ServiceType { + //exhaustive:enforce switch svc { case ExchangeService: return ExchangeMetadataService @@ -85,6 +86,17 @@ func (svc ServiceType) ToMetadata() ServiceType { return SharePointMetadataService case GroupsService: return GroupsMetadataService + + case ExchangeMetadataService: + fallthrough + case OneDriveMetadataService: + fallthrough + case SharePointMetadataService: + fallthrough + case GroupsMetadataService: + fallthrough + case UnknownService: + return svc } return UnknownService diff --git a/src/pkg/repository/repository_unexported_test.go b/src/pkg/repository/repository_unexported_test.go index d57db9cf5..1346bd17a 100644 --- a/src/pkg/repository/repository_unexported_test.go +++ b/src/pkg/repository/repository_unexported_test.go @@ -803,13 +803,16 @@ func writeBackup( sstore = streamstore.NewStreamer(kw, tID, serv) ) - err := sstore.Collect(ctx, streamstore.DetailsCollector(deets)) + reasons, err := sel.Reasons(tID, false) + require.NoError(t, err, clues.ToCore(err)) + + err = sstore.Collect(ctx, streamstore.DetailsCollector(deets)) require.NoError(t, err, "collecting details in streamstore") err = sstore.Collect(ctx, streamstore.FaultErrorsCollector(fe)) require.NoError(t, err, "collecting errors in streamstore") - ssid, err := sstore.Write(ctx, errs) + ssid, err := sstore.Write(ctx, reasons, errs) require.NoError(t, err, "writing to streamstore") tags := map[string]string{ @@ -883,13 +886,16 @@ func (suite *RepositoryModelIntgSuite) TestGetBackupDetails() { ctx, flush := tester.NewContext(t) defer flush() + sel := selectors.NewExchangeBackup([]string{brunhilda}) + sel.Include(sel.MailFolders(selectors.Any())) + b := writeBackup( t, ctx, suite.kw, suite.sw, tenantID, "snapID", test.writeBupID, - selectors.NewExchangeBackup([]string{brunhilda}).Selector, + sel.Selector, brunhilda, brunhilda, test.deets, &fault.Errors{}, @@ -990,13 +996,16 @@ func (suite *RepositoryModelIntgSuite) TestGetBackupErrors() { ctx, flush := tester.NewContext(t) defer flush() + sel := selectors.NewExchangeBackup([]string{brunhilda}) + sel.Include(sel.MailFolders(selectors.Any())) + b := writeBackup( t, ctx, suite.kw, suite.sw, tenantID, "snapID", test.writeBupID, - selectors.NewExchangeBackup([]string{brunhilda}).Selector, + sel.Selector, brunhilda, brunhilda, test.deets, test.errors,