diff --git a/src/.golangci.yml b/src/.golangci.yml index 38e093fdf..470ae3907 100644 --- a/src/.golangci.yml +++ b/src/.golangci.yml @@ -4,6 +4,7 @@ run: linters: enable: - errcheck + - exhaustive - forbidigo - gci - gofmt @@ -25,6 +26,11 @@ linters: - staticcheck linters-settings: + exhaustive: + check: + - switch + default-signifies-exhaustive: false + explicit-exhaustive-switch: true gci: sections: - standard diff --git a/src/internal/kopia/conn_test.go b/src/internal/kopia/conn_test.go index 973f28e2c..b3dac0917 100644 --- a/src/internal/kopia/conn_test.go +++ b/src/internal/kopia/conn_test.go @@ -234,8 +234,8 @@ func (suite *WrapperIntegrationSuite) TestGetPolicyOrDefault_GetsDefault() { }() si := snapshot.SourceInfo{ - Host: corsoHost, - UserName: corsoUser, + Host: "exchangeemail", + UserName: "tenantID-resourceID", Path: "test-path-root", } @@ -271,8 +271,8 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() { // Check the global policy will be the effective policy in future snapshots // for some source info. si := snapshot.SourceInfo{ - Host: corsoHost, - UserName: corsoUser, + Host: "exchangeemail", + UserName: "tenantID-resourceID", Path: "test-path-root", } diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 03a4bc888..592fcbdf6 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -16,6 +16,7 @@ import ( "github.com/kopia/kopia/snapshot/snapshotfs" "github.com/kopia/kopia/snapshot/snapshotmaintenance" "golang.org/x/exp/maps" + "golang.org/x/exp/slices" "github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/readers" @@ -33,13 +34,6 @@ import ( "github.com/alcionai/corso/src/pkg/store" ) -const ( - // TODO(ashmrtnz): These should be some values from upper layer corso, - // possibly corresponding to who is making the backup. - corsoHost = "corso-host" - corsoUser = "corso" -) - // common manifest tags const ( TagBackupID = "backup-id" @@ -202,24 +196,12 @@ func (w Wrapper) ConsumeBackupCollections( return nil, nil, nil, clues.Wrap(err, "building kopia directories") } - // Add some extra tags so we can look things up by reason. - tags := maps.Clone(additionalTags) - if tags == nil { - // Some platforms seem to return nil if the input is nil. - tags = map[string]string{} - } - - for _, r := range backupReasons { - for _, k := range tagKeys(r) { - tags[k] = "" - } - } - s, err := w.makeSnapshotWithRoot( ctx, + backupReasons, assistBase, dirTree, - tags, + additionalTags, progress) if err != nil { return nil, nil, nil, err @@ -228,8 +210,72 @@ func (w Wrapper) ConsumeBackupCollections( return s, progress.deets, progress.toMerge, progress.errs.Failure() } +// userAndHost is used as a passing mechanism for values that will be fed into +// kopia's UserName and Host fields for SourceInfo. It exists to avoid returning +// two strings from the hostAndUserFromReasons function. +type userAndHost struct { + user string + host string +} + +func hostAndUserFromReasons(reasons []identity.Reasoner) (userAndHost, error) { + var ( + tenant string + resource string + // reasonMap is a hash set of the concatenation of the service and category. + reasonMap = map[string]struct{}{} + ) + + for i, reason := range reasons { + // Use a check on the iteration index instead of empty string so we can + // differentiate between the first iteration and a reason with an empty + // value (should result in an error if there's another reason with a + // non-empty value). + if i == 0 { + tenant = reason.Tenant() + } else if tenant != reason.Tenant() { + return userAndHost{}, clues.New("multiple tenant IDs in backup reasons"). + With( + "old_tenant_id", tenant, + "new_tenant_id", reason.Tenant()) + } + + if i == 0 { + resource = reason.ProtectedResource() + } else if resource != reason.ProtectedResource() { + return userAndHost{}, clues.New("multiple protected resource IDs in backup reasons"). + With( + "old_resource_id", resource, + "new_resource_id", reason.ProtectedResource()) + } + + dataType := reason.Service().String() + reason.Category().String() + reasonMap[dataType] = struct{}{} + } + + allReasons := maps.Keys(reasonMap) + slices.Sort(allReasons) + + host := strings.Join(allReasons, "-") + user := strings.Join([]string{tenant, resource}, "-") + + if len(user) == 0 || user == "-" { + return userAndHost{}, clues.New("empty user value") + } + + if len(host) == 0 { + return userAndHost{}, clues.New("empty host value") + } + + return userAndHost{ + host: host, + user: user, + }, nil +} + func (w Wrapper) makeSnapshotWithRoot( ctx context.Context, + backupReasons []identity.Reasoner, prevBases []BackupBase, root fs.Directory, addlTags map[string]string, @@ -252,11 +298,24 @@ func (w Wrapper) makeSnapshotWithRoot( snapIDs = append(snapIDs, ent.ItemDataSnapshot.ID) } + // Add some extra tags so we can look things up by reason. + allTags := maps.Clone(addlTags) + if allTags == nil { + // Some platforms seem to return nil if the input is nil. + allTags = map[string]string{} + } + + for _, r := range backupReasons { + for _, k := range tagKeys(r) { + allTags[k] = "" + } + } + ctx = clues.Add( ctx, "num_assist_snapshots", len(prevBases), "assist_snapshot_ids", snapIDs, - "additional_tags", addlTags) + "additional_tags", allTags) if len(snapIDs) > 0 { logger.Ctx(ctx).Info("using snapshots for kopia-assisted incrementals") @@ -266,7 +325,7 @@ func (w Wrapper) makeSnapshotWithRoot( tags := map[string]string{} - for k, v := range addlTags { + for k, v := range allTags { mk, mv := makeTagKV(k) if len(v) == 0 { @@ -276,7 +335,16 @@ func (w Wrapper) makeSnapshotWithRoot( tags[mk] = v } - err := repo.WriteSession( + // Set the SourceInfo to the tenant ID, resource ID, and the concatenation + // of the service/data types being backed up. This will give us unique + // values for each set of backups with the assumption that no concurrent + // backups for the same set of things is being run on this repo. + userHost, err := hostAndUserFromReasons(backupReasons) + if err != nil { + return nil, clues.StackWC(ctx, err) + } + + err = repo.WriteSession( ctx, w.c, repo.WriteSessionOptions{ @@ -288,10 +356,9 @@ func (w Wrapper) makeSnapshotWithRoot( }, func(innerCtx context.Context, rw repo.RepositoryWriter) error { si := snapshot.SourceInfo{ - Host: corsoHost, - UserName: corsoUser, - // TODO(ashmrtnz): will this be something useful for snapshot lookups later? - Path: root.Name(), + Host: userHost.host, + UserName: userHost.user, + Path: root.Name(), } trueVal := policy.OptionalBool(true) diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index a1a9852e7..922305f89 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -3,6 +3,7 @@ package kopia import ( "bytes" "context" + "fmt" "io" stdpath "path" "strings" @@ -375,6 +376,146 @@ func (suite *BasicKopiaIntegrationSuite) TestUpdatePersistentConfig() { mutableParams.EpochParameters.MinEpochDuration) } +func (suite *BasicKopiaIntegrationSuite) TestConsumeBackupCollections_SetsSourceInfo() { + table := []struct { + name string + reasons []identity.Reasoner + expectError assert.ErrorAssertionFunc + expectUser string + expectHost string + }{ + { + name: "DifferentDataTypesInService", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory), + }, + expectError: assert.NoError, + expectUser: testTenant + "-" + testUser, + expectHost: path.ExchangeService.String() + + path.ContactsCategory.String() + "-" + path.ExchangeService.String() + + path.EmailCategory.String(), + }, + { + name: "DifferentServices", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.OneDriveService, path.FilesCategory), + }, + expectError: assert.NoError, + expectUser: testTenant + "-" + testUser, + expectHost: path.ExchangeService.String() + + path.EmailCategory.String() + "-" + path.OneDriveService.String() + + path.FilesCategory.String(), + }, + { + name: "EmptyTenant", + reasons: []identity.Reasoner{ + identity.NewReason("", testUser, path.ExchangeService, path.EmailCategory), + }, + expectError: assert.NoError, + expectUser: "-" + testUser, + expectHost: path.ExchangeService.String() + path.EmailCategory.String(), + }, + { + name: "EmptyResource", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant, "", path.ExchangeService, path.EmailCategory), + }, + expectError: assert.NoError, + expectUser: testTenant + "-", + expectHost: path.ExchangeService.String() + path.EmailCategory.String(), + }, + { + name: "EmptyTenantAndResource Errors", + reasons: []identity.Reasoner{ + identity.NewReason("", "", path.ExchangeService, path.EmailCategory), + }, + expectError: assert.Error, + }, + { + name: "EmptyAndPopulatedTenant Errors", + reasons: []identity.Reasoner{ + identity.NewReason("", testUser, path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory), + }, + expectError: assert.Error, + }, + { + name: "DifferentTenants Errors", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant+"1", testUser, path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory), + }, + expectError: assert.Error, + }, + { + name: "DifferentResources Errors", + reasons: []identity.Reasoner{ + identity.NewReason(testTenant, testUser+"1", path.ExchangeService, path.EmailCategory), + identity.NewReason(testTenant, testUser, path.ExchangeService, path.ContactsCategory), + }, + expectError: assert.Error, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + var cols []data.BackupCollection + + for i, reason := range test.reasons { + colPath, err := path.Build( + testTenant, + testUser, + reason.Service(), + reason.Category(), + false, + fmt.Sprintf("%d", i)) + require.NoError(t, err, clues.ToCore(err)) + + cols = append(cols, exchMock.NewCollection(colPath, colPath, 0)) + } + + c, err := openLocalKopiaRepo(t, ctx) + require.NoError(t, err, clues.ToCore(err)) + + wrapper := &Wrapper{c} + + defer wrapper.Close(ctx) + + stats, _, _, err := wrapper.ConsumeBackupCollections( + ctx, + test.reasons, + nil, + cols, + nil, + nil, + true, + count.New(), + fault.New(true)) + test.expectError(t, err, clues.ToCore(err)) + + if err != nil { + return + } + + snap, err := snapshot.LoadSnapshot( + ctx, + wrapper.c, + manifest.ID(stats.SnapshotID)) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, test.expectHost, snap.Source.Host, "source host") + assert.Equal(t, test.expectUser, snap.Source.UserName, "source user") + }) + } +} + // --------------- // integration tests that require object locking to be enabled on the bucket. // --------------- diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index b800fc1f2..a18e121ab 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -305,8 +305,26 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { op.Errors.Fail(clues.Wrap(err, "running backup")) } + // Not expected to fail here as we make a similar call in op.do but we do need + // to make sure we pass some sort of reason for persisting the streamstore + // data. + reasons, err := op.Selectors.Reasons(op.account.ID(), false) + if err != nil { + reasons = []identity.Reasoner{identity.NewReason( + op.account.ID(), + op.Selectors.ID(), + path.UnknownService, + path.UnknownCategory)} + } + LogFaultErrors(ctx, op.Errors.Errors(), "running backup") - op.doPersistence(ctx, &opStats, sstore, deets, startTime) + op.doPersistence( + ctx, + reasons, + &opStats, + sstore, + deets, + startTime) finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup") logger.Ctx(ctx).Infow( @@ -319,6 +337,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { func (op *BackupOperation) doPersistence( ctx context.Context, + reasons []identity.Reasoner, opStats *backupStats, detailsStore streamstore.Streamer, deets *details.Builder, @@ -334,6 +353,7 @@ func (op *BackupOperation) doPersistence( err = op.createBackupModels( ctx, + reasons, detailsStore, *opStats, op.Results.BackupID, @@ -905,6 +925,7 @@ func (op *BackupOperation) persistResults( // stores the operation details, results, and selectors in the backup manifest. func (op *BackupOperation) createBackupModels( ctx context.Context, + reasons []identity.Reasoner, sscw streamstore.CollectorWriter, opStats backupStats, backupID model.StableID, @@ -948,7 +969,12 @@ func (op *BackupOperation) createBackupModels( return clues.Wrap(err, "collecting errors for persistence") } - ssid, err := sscw.Write(ctx, errs) + metadataReasons := make([]identity.Reasoner, 0, len(reasons)) + for _, reason := range reasons { + metadataReasons = append(metadataReasons, reason.ToMetadata()) + } + + ssid, err := sscw.Write(ctx, metadataReasons, errs) if err != nil { return clues.Wrap(err, "persisting details and errors") } diff --git a/src/internal/streamstore/collectables_test.go b/src/internal/streamstore/collectables_test.go index 848e0642f..74e95f31a 100644 --- a/src/internal/streamstore/collectables_test.go +++ b/src/internal/streamstore/collectables_test.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/control/repository" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -177,7 +178,16 @@ func (suite *StreamStoreIntgSuite) TestStreamer() { require.NoError(t, err) } - snapid, err := ss.Write(ctx, fault.New(true)) + snapid, err := ss.Write( + ctx, + []identity.Reasoner{ + identity.NewReason( + deetsPath.Tenant(), + deetsPath.ProtectedResource(), + path.ExchangeMetadataService, + deetsPath.Category()), + }, + fault.New(true)) require.NoError(t, err) test.hasSnapID(t, snapid) diff --git a/src/internal/streamstore/mock/mock.go b/src/internal/streamstore/mock/mock.go index 5b4cf7ac1..270fb5267 100644 --- a/src/internal/streamstore/mock/mock.go +++ b/src/internal/streamstore/mock/mock.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/corso/src/internal/streamstore" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/fault" ) @@ -57,7 +58,11 @@ func (ms Streamer) Read( return col.Unmr(io.NopCloser(bytes.NewReader(bs))) } -func (ms Streamer) Write(context.Context, *fault.Bus) (string, error) { +func (ms Streamer) Write( + context.Context, + []identity.Reasoner, + *fault.Bus, +) (string, error) { return "", clues.New("not implemented") } diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index a0686c0c3..9811ab442 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/kopia/inject" "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -62,10 +63,14 @@ func (ss *storeStreamer) Collect(ctx context.Context, col Collectable) error { } // Write persists the collected objects in the stream store -func (ss *storeStreamer) Write(ctx context.Context, errs *fault.Bus) (string, error) { +func (ss *storeStreamer) Write( + ctx context.Context, + reasons []identity.Reasoner, + errs *fault.Bus, +) (string, error) { ctx = clues.Add(ctx, "snapshot_type", "stream store") - id, err := write(ctx, ss.kw, ss.dbcs, errs) + id, err := write(ctx, ss.kw, reasons, ss.dbcs, errs) if err != nil { return "", clues.Wrap(err, "writing to stream store") } @@ -109,7 +114,7 @@ type Reader interface { } type Writer interface { - Write(context.Context, *fault.Bus) (string, error) + Write(context.Context, []identity.Reasoner, *fault.Bus) (string, error) } // Marshallers are used to convert structs into bytes to be persisted in the store. @@ -203,6 +208,7 @@ func collect( func write( ctx context.Context, bup inject.BackupConsumer, + reasons []identity.Reasoner, dbcs []data.BackupCollection, errs *fault.Bus, ) (string, error) { @@ -210,7 +216,7 @@ func write( backupStats, _, _, err := bup.ConsumeBackupCollections( ctx, - nil, + reasons, nil, dbcs, prefixmatcher.NopReader[map[string]struct{}](), diff --git a/src/pkg/path/service_category_test.go b/src/pkg/path/service_category_test.go index ceffe2f3c..789e1a2f4 100644 --- a/src/pkg/path/service_category_test.go +++ b/src/pkg/path/service_category_test.go @@ -209,6 +209,22 @@ func (suite *ServiceCategoryUnitSuite) TestToMetadata() { input: GroupsService, expect: GroupsMetadataService, }, + { + input: ExchangeMetadataService, + expect: ExchangeMetadataService, + }, + { + input: OneDriveMetadataService, + expect: OneDriveMetadataService, + }, + { + input: SharePointMetadataService, + expect: SharePointMetadataService, + }, + { + input: GroupsMetadataService, + expect: GroupsMetadataService, + }, { input: UnknownService, expect: UnknownService, diff --git a/src/pkg/path/service_type.go b/src/pkg/path/service_type.go index 1c9e2b73a..b68e60ad6 100644 --- a/src/pkg/path/service_type.go +++ b/src/pkg/path/service_type.go @@ -76,6 +76,7 @@ func (svc ServiceType) HumanString() string { } func (svc ServiceType) ToMetadata() ServiceType { + //exhaustive:enforce switch svc { case ExchangeService: return ExchangeMetadataService @@ -85,6 +86,17 @@ func (svc ServiceType) ToMetadata() ServiceType { return SharePointMetadataService case GroupsService: return GroupsMetadataService + + case ExchangeMetadataService: + fallthrough + case OneDriveMetadataService: + fallthrough + case SharePointMetadataService: + fallthrough + case GroupsMetadataService: + fallthrough + case UnknownService: + return svc } return UnknownService diff --git a/src/pkg/repository/repository_unexported_test.go b/src/pkg/repository/repository_unexported_test.go index d57db9cf5..1346bd17a 100644 --- a/src/pkg/repository/repository_unexported_test.go +++ b/src/pkg/repository/repository_unexported_test.go @@ -803,13 +803,16 @@ func writeBackup( sstore = streamstore.NewStreamer(kw, tID, serv) ) - err := sstore.Collect(ctx, streamstore.DetailsCollector(deets)) + reasons, err := sel.Reasons(tID, false) + require.NoError(t, err, clues.ToCore(err)) + + err = sstore.Collect(ctx, streamstore.DetailsCollector(deets)) require.NoError(t, err, "collecting details in streamstore") err = sstore.Collect(ctx, streamstore.FaultErrorsCollector(fe)) require.NoError(t, err, "collecting errors in streamstore") - ssid, err := sstore.Write(ctx, errs) + ssid, err := sstore.Write(ctx, reasons, errs) require.NoError(t, err, "writing to streamstore") tags := map[string]string{ @@ -883,13 +886,16 @@ func (suite *RepositoryModelIntgSuite) TestGetBackupDetails() { ctx, flush := tester.NewContext(t) defer flush() + sel := selectors.NewExchangeBackup([]string{brunhilda}) + sel.Include(sel.MailFolders(selectors.Any())) + b := writeBackup( t, ctx, suite.kw, suite.sw, tenantID, "snapID", test.writeBupID, - selectors.NewExchangeBackup([]string{brunhilda}).Selector, + sel.Selector, brunhilda, brunhilda, test.deets, &fault.Errors{}, @@ -990,13 +996,16 @@ func (suite *RepositoryModelIntgSuite) TestGetBackupErrors() { ctx, flush := tester.NewContext(t) defer flush() + sel := selectors.NewExchangeBackup([]string{brunhilda}) + sel.Include(sel.MailFolders(selectors.Any())) + b := writeBackup( t, ctx, suite.kw, suite.sw, tenantID, "snapID", test.writeBupID, - selectors.NewExchangeBackup([]string{brunhilda}).Selector, + sel.Selector, brunhilda, brunhilda, test.deets, test.errors,