From 09aebc4a31601e3ea2709adab2e57a14de7a5184 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Thu, 21 Sep 2023 15:15:29 -0700 Subject: [PATCH] Wire backup cleanup into maintenance operation (#4091) Wire backup cleanup into maintenance operation. Currently it runs under a different mode so it won't affect existing users of the maintenance command. Once we're satisfied with testing we can remove the temporary const and run this when complete maintenance is run --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [x] :clock1: Yes, but in a later PR - [ ] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * #3217 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/kopia/cleanup_backups.go | 5 + src/internal/kopia/wrapper.go | 18 ++ src/internal/kopia/wrapper_test.go | 26 ++- src/internal/operations/maintenance.go | 10 +- src/internal/operations/maintenance_test.go | 216 ++++++++++++++++++-- src/pkg/control/repository/repo.go | 7 +- src/pkg/repository/repository.go | 1 + 7 files changed, 252 insertions(+), 31 deletions(-) diff --git a/src/internal/kopia/cleanup_backups.go b/src/internal/kopia/cleanup_backups.go index 03a642024..af3be7d2f 100644 --- a/src/internal/kopia/cleanup_backups.go +++ b/src/internal/kopia/cleanup_backups.go @@ -57,6 +57,11 @@ func cleanupOrphanedData( gcBuffer time.Duration, nowFunc func() time.Time, ) error { + logger.Ctx(ctx).Infow( + "cleaning up failed and incomplete backups", + "current_time", nowFunc(), + "buffer_duration", gcBuffer) + // Get all snapshot manifests. snaps, err := mf.FindManifests( ctx, diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 28ee1e552..24e0708b5 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -4,6 +4,7 @@ import ( "context" "errors" "strings" + "time" "github.com/alcionai/clues" "github.com/kopia/kopia/fs" @@ -575,6 +576,7 @@ func isErrEntryNotFound(err error) bool { func (w Wrapper) RepoMaintenance( ctx context.Context, + storer store.Storer, opts repository.Maintenance, ) error { kopiaSafety, err := translateSafety(opts.Safety) @@ -596,6 +598,22 @@ func (w Wrapper) RepoMaintenance( "force", opts.Force, "current_local_owner", clues.Hide(currentOwner)) + // Check if we should do additional cleanup prior to running kopia's + // maintenance. + if opts.Type == repository.CompleteMaintenance { + buffer := time.Hour * 24 * 7 + if opts.CleanupBuffer != nil { + buffer = *opts.CleanupBuffer + } + + // Even if we fail this we don't want to fail the overall maintenance + // operation since there's other useful work we can still do. + if err := cleanupOrphanedData(ctx, storer, w.c, buffer, time.Now); err != nil { + logger.CtxErr(ctx, err).Info( + "cleaning up failed backups, some space may not be freed") + } + } + dr, ok := w.c.Repository.(repo.DirectRepository) if !ok { return clues.New("unable to get valid handle to repo").WithClues(ctx) diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index a1af89a0d..452bc4ffa 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -194,7 +194,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_FirstRun_NoChanges() { Type: repository.MetadataMaintenance, } - err = w.RepoMaintenance(ctx, opts) + err = w.RepoMaintenance(ctx, nil, opts) require.NoError(t, err, clues.ToCore(err)) } @@ -215,7 +215,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_NoForce_Fails } // This will set the user. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) err = k.Close(ctx) @@ -231,7 +231,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_NoForce_Fails var notOwnedErr maintenance.NotOwnedError - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) assert.ErrorAs(t, err, ¬OwnedErr, clues.ToCore(err)) } @@ -252,7 +252,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_Force_Succeed } // This will set the user. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) err = k.Close(ctx) @@ -269,13 +269,13 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_Force_Succeed mOpts.Force = true // This will set the user. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) mOpts.Force = false // Running without force should succeed now. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) } @@ -546,7 +546,7 @@ func (suite *RetentionIntegrationSuite) TestSetRetentionParameters_And_Maintenan // This will set common maintenance config parameters. There's some interplay // between the maintenance schedule and retention period that we want to check // below. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) // Enable retention. @@ -640,6 +640,9 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM w := &Wrapper{k} + ms, err := NewModelStore(k) + require.NoError(t, err, "getting model store: %s", clues.ToCore(err)) + mOpts := repository.Maintenance{ Safety: repository.FullMaintenanceSafety, Type: repository.CompleteMaintenance, @@ -648,7 +651,7 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM // This will set common maintenance config parameters. There's some interplay // between the maintenance schedule and retention period that we want to check // below. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, ms, mOpts) require.NoError(t, err, clues.ToCore(err)) // Enable retention. @@ -667,10 +670,15 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM time.Hour*48, assert.True) + err = ms.Close(ctx) + require.NoError(t, err, clues.ToCore(err)) + if test.reopen { mustReopen(t, ctx, w) } + ms.c = w.c + // Change retention duration without updating mode. err = w.SetRetentionParameters(ctx, repository.Retention{ Duration: ptr.To(time.Hour * 96), @@ -687,7 +695,7 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM // Run full maintenance again. This should extend object locks for things if // they exist. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, ms, mOpts) require.NoError(t, err, clues.ToCore(err)) }) } diff --git a/src/internal/operations/maintenance.go b/src/internal/operations/maintenance.go index a00f13272..4c4003733 100644 --- a/src/internal/operations/maintenance.go +++ b/src/internal/operations/maintenance.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/repository" "github.com/alcionai/corso/src/pkg/count" + "github.com/alcionai/corso/src/pkg/store" ) // MaintenanceOperation wraps an operation with restore-specific props. @@ -33,17 +34,18 @@ func NewMaintenanceOperation( ctx context.Context, opts control.Options, kw *kopia.Wrapper, + storer store.BackupStorer, mOpts repository.Maintenance, bus events.Eventer, ) (MaintenanceOperation, error) { op := MaintenanceOperation{ - operation: newOperation(opts, bus, count.New(), kw, nil), + operation: newOperation(opts, bus, count.New(), kw, storer), mOpts: mOpts, } - // Don't run validation because we don't populate the model store. + err := op.validate() - return op, nil + return op, clues.Stack(err).OrNil() } func (op *MaintenanceOperation) Run(ctx context.Context) (err error) { @@ -83,7 +85,7 @@ func (op *MaintenanceOperation) do(ctx context.Context) error { op.Results.CompletedAt = time.Now() }() - err := op.operation.kopia.RepoMaintenance(ctx, op.mOpts) + err := op.operation.kopia.RepoMaintenance(ctx, op.store, op.mOpts) if err != nil { op.Status = Failed return clues.Wrap(err, "running maintenance operation") diff --git a/src/internal/operations/maintenance_test.go b/src/internal/operations/maintenance_test.go index 6c822e1cd..9959e8fb6 100644 --- a/src/internal/operations/maintenance_test.go +++ b/src/internal/operations/maintenance_test.go @@ -1,22 +1,60 @@ package operations import ( + "context" + "fmt" "testing" + "time" "github.com/alcionai/clues" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/data" + dataMock "github.com/alcionai/corso/src/internal/data/mock" evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/m365/graph" + odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/internal/model" + opMock "github.com/alcionai/corso/src/internal/operations/inject/mock" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" + "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/repository" + "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" + selTD "github.com/alcionai/corso/src/pkg/selectors/testdata" storeTD "github.com/alcionai/corso/src/pkg/storage/testdata" + "github.com/alcionai/corso/src/pkg/store" ) +func getKopiaHandles( + t *testing.T, + ctx context.Context, //revive:disable-line:context-as-argument +) (*kopia.Wrapper, *kopia.ModelStore) { + st := storeTD.NewPrefixedS3Storage(t) + k := kopia.NewConn(st) + err := k.Initialize(ctx, repository.Options{}, repository.Retention{}) + require.NoError(t, err, clues.ToCore(err)) + + kw, err := kopia.NewWrapper(k) + // kopiaRef comes with a count of 1 and Wrapper bumps it again so safe + // to close here. + k.Close(ctx) + + require.NoError(t, err, "getting kopia wrapper: %s", clues.ToCore(err)) + + ms, err := kopia.NewModelStore(k) + require.NoError(t, err, "getting model store: %s", clues.ToCore(err)) + + return kw, ms +} + type MaintenanceOpIntegrationSuite struct { tester.Suite } @@ -30,32 +68,21 @@ func TestMaintenanceOpIntegrationSuite(t *testing.T) { } func (suite *MaintenanceOpIntegrationSuite) TestRepoMaintenance() { - var ( - t = suite.T() - // need to initialize the repository before we can test connecting to it. - st = storeTD.NewPrefixedS3Storage(t) - k = kopia.NewConn(st) - ) + t := suite.T() ctx, flush := tester.NewContext(t) defer flush() - err := k.Initialize(ctx, repository.Options{}, repository.Retention{}) - require.NoError(t, err, clues.ToCore(err)) - - kw, err := kopia.NewWrapper(k) - // kopiaRef comes with a count of 1 and Wrapper bumps it again so safe - // to close here. - k.Close(ctx) - - require.NoError(t, err, clues.ToCore(err)) + kw, ms := getKopiaHandles(t, ctx) defer kw.Close(ctx) + defer ms.Close(ctx) mo, err := NewMaintenanceOperation( ctx, control.DefaultOptions(), kw, + store.NewWrapper(ms), repository.Maintenance{ Type: repository.MetadataMaintenance, }, @@ -65,3 +92,162 @@ func (suite *MaintenanceOpIntegrationSuite) TestRepoMaintenance() { err = mo.Run(ctx) assert.NoError(t, err, clues.ToCore(err)) } + +type MaintenanceOpNightlySuite struct { + tester.Suite +} + +func TestMaintenanceOpNightlySuite(t *testing.T) { + suite.Run(t, &MaintenanceOpNightlySuite{ + Suite: tester.NewNightlySuite( + t, + [][]string{storeTD.AWSStorageCredEnvs, tconfig.M365AcctCredEnvs}), + }) +} + +func (suite *MaintenanceOpNightlySuite) TestRepoMaintenance_GarbageCollection() { + var ( + t = suite.T() + acct = tconfig.NewM365Account(suite.T()) + tenantID = acct.Config[account.AzureTenantIDKey] + opts = control.DefaultOptions() + osel = selectors.NewOneDriveBackup([]string{userID}) + // Default policy used by SDK clients + failurePolicy = control.FailAfterRecovery + T1 = time.Now().Truncate(0) + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + osel.Include(selTD.OneDriveBackupFolderScope(osel)) + + pathElements := []string{odConsts.DrivesPathDir, "drive-id", odConsts.RootPathDir, folderID} + + tmp, err := path.Build(tenantID, userID, path.OneDriveService, path.FilesCategory, false, pathElements...) + require.NoError(suite.T(), err, clues.ToCore(err)) + + locPath := path.Builder{}.Append(tmp.Folders()...) + + kw, ms := getKopiaHandles(t, ctx) + storer := store.NewWrapper(ms) + + var bupIDs []model.StableID + + // Make two failed backups so the garbage collection code will try to delete + // something. + for i := 0; i < 2; i++ { + suite.Run(fmt.Sprintf("Setup%d", i), func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + cs := []data.BackupCollection{ + makeBackupCollection( + tmp, + locPath, + []dataMock.Item{ + makeMockItem("file1", nil, T1, false, nil), + makeMockItem("file2", nil, T1, false, assert.AnError), + }), + } + + prefixPath, err := path.BuildMetadata( + tenantID, + userID, + path.OneDriveService, + path.FilesCategory, + false) + require.NoError(t, err, clues.ToCore(err)) + + mc, err := graph.MakeMetadataCollection( + prefixPath, + makeMetadataCollectionEntries("url/1", driveID, folderID, tmp), + func(*support.ControllerOperationStatus) {}) + require.NoError(t, err, clues.ToCore(err)) + + cs = append(cs, mc) + bp := opMock.NewMockBackupProducer(cs, data.CollectionStats{}, false) + + opts.FailureHandling = failurePolicy + + bo, err := NewBackupOperation( + ctx, + opts, + kw, + storer, + &bp, + acct, + osel.Selector, + selectors.Selector{DiscreteOwner: userID}, + evmock.NewBus()) + require.NoError(t, err, clues.ToCore(err)) + + err = bo.Run(ctx) + assert.Error(t, err, clues.ToCore(err)) + + require.NotEmpty(t, bo.Results.BackupID) + + bupIDs = append(bupIDs, bo.Results.BackupID) + }) + } + + // Double check we have two backup models. This is not an exhaustive check but + // will give us some comfort that things are working as expected. + bups, err := storer.GetBackups(ctx) + require.NoError( + t, + err, + "checking backup model existence: %s", + clues.ToCore(err)) + + var gotBupIDs []model.StableID + + for _, bup := range bups { + gotBupIDs = append(gotBupIDs, bup.ID) + } + + require.ElementsMatch(t, bupIDs, gotBupIDs) + + // Run maintenance with garbage collection. + + suite.Run("RunMaintenance", func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + mo, err := NewMaintenanceOperation( + ctx, + control.DefaultOptions(), + kw, + store.NewWrapper(ms), + repository.Maintenance{ + Type: repository.CompleteMaintenance, + // Set buffer to 0 so things will actually be garbage collected. + CleanupBuffer: ptr.To(time.Duration(0)), + }, + evmock.NewBus()) + require.NoError(t, err, clues.ToCore(err)) + + err = mo.Run(ctx) + assert.NoError(t, err, clues.ToCore(err)) + + // Check for backup models again. Only the second one should still be present. + bups, err = storer.GetBackups(ctx) + require.NoError( + t, + err, + "checking backup model existence after maintenance: %s", + clues.ToCore(err)) + + gotBupIDs = nil + + for _, bup := range bups { + gotBupIDs = append(gotBupIDs, bup.ID) + } + + assert.ElementsMatch(t, bupIDs[1:], gotBupIDs) + }) +} diff --git a/src/pkg/control/repository/repo.go b/src/pkg/control/repository/repo.go index 791e05131..9f39278d9 100644 --- a/src/pkg/control/repository/repo.go +++ b/src/pkg/control/repository/repo.go @@ -15,9 +15,10 @@ type Options struct { } type Maintenance struct { - Type MaintenanceType `json:"type"` - Safety MaintenanceSafety `json:"safety"` - Force bool `json:"force"` + Type MaintenanceType `json:"type"` + Safety MaintenanceSafety `json:"safety"` + Force bool `json:"force"` + CleanupBuffer *time.Duration } // --------------------------------------------------------------------------- diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 4cd361e07..1189a3ef1 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -414,6 +414,7 @@ func (r repository) NewMaintenance( ctx, r.Opts, r.dataLayer, + store.NewWrapper(r.modelStore), mOpts, r.Bus) }