diff --git a/src/internal/kopia/cleanup_backups.go b/src/internal/kopia/cleanup_backups.go index 03a642024..af3be7d2f 100644 --- a/src/internal/kopia/cleanup_backups.go +++ b/src/internal/kopia/cleanup_backups.go @@ -57,6 +57,11 @@ func cleanupOrphanedData( gcBuffer time.Duration, nowFunc func() time.Time, ) error { + logger.Ctx(ctx).Infow( + "cleaning up failed and incomplete backups", + "current_time", nowFunc(), + "buffer_duration", gcBuffer) + // Get all snapshot manifests. snaps, err := mf.FindManifests( ctx, diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 28ee1e552..24e0708b5 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -4,6 +4,7 @@ import ( "context" "errors" "strings" + "time" "github.com/alcionai/clues" "github.com/kopia/kopia/fs" @@ -575,6 +576,7 @@ func isErrEntryNotFound(err error) bool { func (w Wrapper) RepoMaintenance( ctx context.Context, + storer store.Storer, opts repository.Maintenance, ) error { kopiaSafety, err := translateSafety(opts.Safety) @@ -596,6 +598,22 @@ func (w Wrapper) RepoMaintenance( "force", opts.Force, "current_local_owner", clues.Hide(currentOwner)) + // Check if we should do additional cleanup prior to running kopia's + // maintenance. + if opts.Type == repository.CompleteMaintenance { + buffer := time.Hour * 24 * 7 + if opts.CleanupBuffer != nil { + buffer = *opts.CleanupBuffer + } + + // Even if we fail this we don't want to fail the overall maintenance + // operation since there's other useful work we can still do. + if err := cleanupOrphanedData(ctx, storer, w.c, buffer, time.Now); err != nil { + logger.CtxErr(ctx, err).Info( + "cleaning up failed backups, some space may not be freed") + } + } + dr, ok := w.c.Repository.(repo.DirectRepository) if !ok { return clues.New("unable to get valid handle to repo").WithClues(ctx) diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index a1af89a0d..452bc4ffa 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -194,7 +194,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_FirstRun_NoChanges() { Type: repository.MetadataMaintenance, } - err = w.RepoMaintenance(ctx, opts) + err = w.RepoMaintenance(ctx, nil, opts) require.NoError(t, err, clues.ToCore(err)) } @@ -215,7 +215,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_NoForce_Fails } // This will set the user. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) err = k.Close(ctx) @@ -231,7 +231,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_NoForce_Fails var notOwnedErr maintenance.NotOwnedError - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) assert.ErrorAs(t, err, ¬OwnedErr, clues.ToCore(err)) } @@ -252,7 +252,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_Force_Succeed } // This will set the user. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) err = k.Close(ctx) @@ -269,13 +269,13 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_Force_Succeed mOpts.Force = true // This will set the user. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) mOpts.Force = false // Running without force should succeed now. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) } @@ -546,7 +546,7 @@ func (suite *RetentionIntegrationSuite) TestSetRetentionParameters_And_Maintenan // This will set common maintenance config parameters. There's some interplay // between the maintenance schedule and retention period that we want to check // below. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, nil, mOpts) require.NoError(t, err, clues.ToCore(err)) // Enable retention. @@ -640,6 +640,9 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM w := &Wrapper{k} + ms, err := NewModelStore(k) + require.NoError(t, err, "getting model store: %s", clues.ToCore(err)) + mOpts := repository.Maintenance{ Safety: repository.FullMaintenanceSafety, Type: repository.CompleteMaintenance, @@ -648,7 +651,7 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM // This will set common maintenance config parameters. There's some interplay // between the maintenance schedule and retention period that we want to check // below. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, ms, mOpts) require.NoError(t, err, clues.ToCore(err)) // Enable retention. @@ -667,10 +670,15 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM time.Hour*48, assert.True) + err = ms.Close(ctx) + require.NoError(t, err, clues.ToCore(err)) + if test.reopen { mustReopen(t, ctx, w) } + ms.c = w.c + // Change retention duration without updating mode. err = w.SetRetentionParameters(ctx, repository.Retention{ Duration: ptr.To(time.Hour * 96), @@ -687,7 +695,7 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM // Run full maintenance again. This should extend object locks for things if // they exist. - err = w.RepoMaintenance(ctx, mOpts) + err = w.RepoMaintenance(ctx, ms, mOpts) require.NoError(t, err, clues.ToCore(err)) }) } diff --git a/src/internal/operations/maintenance.go b/src/internal/operations/maintenance.go index a00f13272..4c4003733 100644 --- a/src/internal/operations/maintenance.go +++ b/src/internal/operations/maintenance.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/repository" "github.com/alcionai/corso/src/pkg/count" + "github.com/alcionai/corso/src/pkg/store" ) // MaintenanceOperation wraps an operation with restore-specific props. @@ -33,17 +34,18 @@ func NewMaintenanceOperation( ctx context.Context, opts control.Options, kw *kopia.Wrapper, + storer store.BackupStorer, mOpts repository.Maintenance, bus events.Eventer, ) (MaintenanceOperation, error) { op := MaintenanceOperation{ - operation: newOperation(opts, bus, count.New(), kw, nil), + operation: newOperation(opts, bus, count.New(), kw, storer), mOpts: mOpts, } - // Don't run validation because we don't populate the model store. + err := op.validate() - return op, nil + return op, clues.Stack(err).OrNil() } func (op *MaintenanceOperation) Run(ctx context.Context) (err error) { @@ -83,7 +85,7 @@ func (op *MaintenanceOperation) do(ctx context.Context) error { op.Results.CompletedAt = time.Now() }() - err := op.operation.kopia.RepoMaintenance(ctx, op.mOpts) + err := op.operation.kopia.RepoMaintenance(ctx, op.store, op.mOpts) if err != nil { op.Status = Failed return clues.Wrap(err, "running maintenance operation") diff --git a/src/internal/operations/maintenance_test.go b/src/internal/operations/maintenance_test.go index 6c822e1cd..9959e8fb6 100644 --- a/src/internal/operations/maintenance_test.go +++ b/src/internal/operations/maintenance_test.go @@ -1,22 +1,60 @@ package operations import ( + "context" + "fmt" "testing" + "time" "github.com/alcionai/clues" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/data" + dataMock "github.com/alcionai/corso/src/internal/data/mock" evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/m365/graph" + odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/internal/model" + opMock "github.com/alcionai/corso/src/internal/operations/inject/mock" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" + "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/repository" + "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" + selTD "github.com/alcionai/corso/src/pkg/selectors/testdata" storeTD "github.com/alcionai/corso/src/pkg/storage/testdata" + "github.com/alcionai/corso/src/pkg/store" ) +func getKopiaHandles( + t *testing.T, + ctx context.Context, //revive:disable-line:context-as-argument +) (*kopia.Wrapper, *kopia.ModelStore) { + st := storeTD.NewPrefixedS3Storage(t) + k := kopia.NewConn(st) + err := k.Initialize(ctx, repository.Options{}, repository.Retention{}) + require.NoError(t, err, clues.ToCore(err)) + + kw, err := kopia.NewWrapper(k) + // kopiaRef comes with a count of 1 and Wrapper bumps it again so safe + // to close here. + k.Close(ctx) + + require.NoError(t, err, "getting kopia wrapper: %s", clues.ToCore(err)) + + ms, err := kopia.NewModelStore(k) + require.NoError(t, err, "getting model store: %s", clues.ToCore(err)) + + return kw, ms +} + type MaintenanceOpIntegrationSuite struct { tester.Suite } @@ -30,32 +68,21 @@ func TestMaintenanceOpIntegrationSuite(t *testing.T) { } func (suite *MaintenanceOpIntegrationSuite) TestRepoMaintenance() { - var ( - t = suite.T() - // need to initialize the repository before we can test connecting to it. - st = storeTD.NewPrefixedS3Storage(t) - k = kopia.NewConn(st) - ) + t := suite.T() ctx, flush := tester.NewContext(t) defer flush() - err := k.Initialize(ctx, repository.Options{}, repository.Retention{}) - require.NoError(t, err, clues.ToCore(err)) - - kw, err := kopia.NewWrapper(k) - // kopiaRef comes with a count of 1 and Wrapper bumps it again so safe - // to close here. - k.Close(ctx) - - require.NoError(t, err, clues.ToCore(err)) + kw, ms := getKopiaHandles(t, ctx) defer kw.Close(ctx) + defer ms.Close(ctx) mo, err := NewMaintenanceOperation( ctx, control.DefaultOptions(), kw, + store.NewWrapper(ms), repository.Maintenance{ Type: repository.MetadataMaintenance, }, @@ -65,3 +92,162 @@ func (suite *MaintenanceOpIntegrationSuite) TestRepoMaintenance() { err = mo.Run(ctx) assert.NoError(t, err, clues.ToCore(err)) } + +type MaintenanceOpNightlySuite struct { + tester.Suite +} + +func TestMaintenanceOpNightlySuite(t *testing.T) { + suite.Run(t, &MaintenanceOpNightlySuite{ + Suite: tester.NewNightlySuite( + t, + [][]string{storeTD.AWSStorageCredEnvs, tconfig.M365AcctCredEnvs}), + }) +} + +func (suite *MaintenanceOpNightlySuite) TestRepoMaintenance_GarbageCollection() { + var ( + t = suite.T() + acct = tconfig.NewM365Account(suite.T()) + tenantID = acct.Config[account.AzureTenantIDKey] + opts = control.DefaultOptions() + osel = selectors.NewOneDriveBackup([]string{userID}) + // Default policy used by SDK clients + failurePolicy = control.FailAfterRecovery + T1 = time.Now().Truncate(0) + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + osel.Include(selTD.OneDriveBackupFolderScope(osel)) + + pathElements := []string{odConsts.DrivesPathDir, "drive-id", odConsts.RootPathDir, folderID} + + tmp, err := path.Build(tenantID, userID, path.OneDriveService, path.FilesCategory, false, pathElements...) + require.NoError(suite.T(), err, clues.ToCore(err)) + + locPath := path.Builder{}.Append(tmp.Folders()...) + + kw, ms := getKopiaHandles(t, ctx) + storer := store.NewWrapper(ms) + + var bupIDs []model.StableID + + // Make two failed backups so the garbage collection code will try to delete + // something. + for i := 0; i < 2; i++ { + suite.Run(fmt.Sprintf("Setup%d", i), func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + cs := []data.BackupCollection{ + makeBackupCollection( + tmp, + locPath, + []dataMock.Item{ + makeMockItem("file1", nil, T1, false, nil), + makeMockItem("file2", nil, T1, false, assert.AnError), + }), + } + + prefixPath, err := path.BuildMetadata( + tenantID, + userID, + path.OneDriveService, + path.FilesCategory, + false) + require.NoError(t, err, clues.ToCore(err)) + + mc, err := graph.MakeMetadataCollection( + prefixPath, + makeMetadataCollectionEntries("url/1", driveID, folderID, tmp), + func(*support.ControllerOperationStatus) {}) + require.NoError(t, err, clues.ToCore(err)) + + cs = append(cs, mc) + bp := opMock.NewMockBackupProducer(cs, data.CollectionStats{}, false) + + opts.FailureHandling = failurePolicy + + bo, err := NewBackupOperation( + ctx, + opts, + kw, + storer, + &bp, + acct, + osel.Selector, + selectors.Selector{DiscreteOwner: userID}, + evmock.NewBus()) + require.NoError(t, err, clues.ToCore(err)) + + err = bo.Run(ctx) + assert.Error(t, err, clues.ToCore(err)) + + require.NotEmpty(t, bo.Results.BackupID) + + bupIDs = append(bupIDs, bo.Results.BackupID) + }) + } + + // Double check we have two backup models. This is not an exhaustive check but + // will give us some comfort that things are working as expected. + bups, err := storer.GetBackups(ctx) + require.NoError( + t, + err, + "checking backup model existence: %s", + clues.ToCore(err)) + + var gotBupIDs []model.StableID + + for _, bup := range bups { + gotBupIDs = append(gotBupIDs, bup.ID) + } + + require.ElementsMatch(t, bupIDs, gotBupIDs) + + // Run maintenance with garbage collection. + + suite.Run("RunMaintenance", func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + mo, err := NewMaintenanceOperation( + ctx, + control.DefaultOptions(), + kw, + store.NewWrapper(ms), + repository.Maintenance{ + Type: repository.CompleteMaintenance, + // Set buffer to 0 so things will actually be garbage collected. + CleanupBuffer: ptr.To(time.Duration(0)), + }, + evmock.NewBus()) + require.NoError(t, err, clues.ToCore(err)) + + err = mo.Run(ctx) + assert.NoError(t, err, clues.ToCore(err)) + + // Check for backup models again. Only the second one should still be present. + bups, err = storer.GetBackups(ctx) + require.NoError( + t, + err, + "checking backup model existence after maintenance: %s", + clues.ToCore(err)) + + gotBupIDs = nil + + for _, bup := range bups { + gotBupIDs = append(gotBupIDs, bup.ID) + } + + assert.ElementsMatch(t, bupIDs[1:], gotBupIDs) + }) +} diff --git a/src/pkg/control/repository/repo.go b/src/pkg/control/repository/repo.go index 791e05131..9f39278d9 100644 --- a/src/pkg/control/repository/repo.go +++ b/src/pkg/control/repository/repo.go @@ -15,9 +15,10 @@ type Options struct { } type Maintenance struct { - Type MaintenanceType `json:"type"` - Safety MaintenanceSafety `json:"safety"` - Force bool `json:"force"` + Type MaintenanceType `json:"type"` + Safety MaintenanceSafety `json:"safety"` + Force bool `json:"force"` + CleanupBuffer *time.Duration } // --------------------------------------------------------------------------- diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 4cd361e07..1189a3ef1 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -414,6 +414,7 @@ func (r repository) NewMaintenance( ctx, r.Opts, r.dataLayer, + store.NewWrapper(r.modelStore), mOpts, r.Bus) }