Wire backup cleanup into maintenance operation (#4091)
Wire backup cleanup into maintenance operation. Currently it runs under a different mode so it won't affect existing users of the maintenance command. Once we're satisfied with testing we can remove the temporary const and run this when complete maintenance is run --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [x] 🕐 Yes, but in a later PR - [ ] ⛔ No #### Type of change - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup #### Issue(s) * #3217 #### Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
b33231e98d
commit
09aebc4a31
@ -57,6 +57,11 @@ func cleanupOrphanedData(
|
||||
gcBuffer time.Duration,
|
||||
nowFunc func() time.Time,
|
||||
) error {
|
||||
logger.Ctx(ctx).Infow(
|
||||
"cleaning up failed and incomplete backups",
|
||||
"current_time", nowFunc(),
|
||||
"buffer_duration", gcBuffer)
|
||||
|
||||
// Get all snapshot manifests.
|
||||
snaps, err := mf.FindManifests(
|
||||
ctx,
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/kopia/kopia/fs"
|
||||
@ -575,6 +576,7 @@ func isErrEntryNotFound(err error) bool {
|
||||
|
||||
func (w Wrapper) RepoMaintenance(
|
||||
ctx context.Context,
|
||||
storer store.Storer,
|
||||
opts repository.Maintenance,
|
||||
) error {
|
||||
kopiaSafety, err := translateSafety(opts.Safety)
|
||||
@ -596,6 +598,22 @@ func (w Wrapper) RepoMaintenance(
|
||||
"force", opts.Force,
|
||||
"current_local_owner", clues.Hide(currentOwner))
|
||||
|
||||
// Check if we should do additional cleanup prior to running kopia's
|
||||
// maintenance.
|
||||
if opts.Type == repository.CompleteMaintenance {
|
||||
buffer := time.Hour * 24 * 7
|
||||
if opts.CleanupBuffer != nil {
|
||||
buffer = *opts.CleanupBuffer
|
||||
}
|
||||
|
||||
// Even if we fail this we don't want to fail the overall maintenance
|
||||
// operation since there's other useful work we can still do.
|
||||
if err := cleanupOrphanedData(ctx, storer, w.c, buffer, time.Now); err != nil {
|
||||
logger.CtxErr(ctx, err).Info(
|
||||
"cleaning up failed backups, some space may not be freed")
|
||||
}
|
||||
}
|
||||
|
||||
dr, ok := w.c.Repository.(repo.DirectRepository)
|
||||
if !ok {
|
||||
return clues.New("unable to get valid handle to repo").WithClues(ctx)
|
||||
|
||||
@ -194,7 +194,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_FirstRun_NoChanges() {
|
||||
Type: repository.MetadataMaintenance,
|
||||
}
|
||||
|
||||
err = w.RepoMaintenance(ctx, opts)
|
||||
err = w.RepoMaintenance(ctx, nil, opts)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
}
|
||||
|
||||
@ -215,7 +215,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_NoForce_Fails
|
||||
}
|
||||
|
||||
// This will set the user.
|
||||
err = w.RepoMaintenance(ctx, mOpts)
|
||||
err = w.RepoMaintenance(ctx, nil, mOpts)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
err = k.Close(ctx)
|
||||
@ -231,7 +231,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_NoForce_Fails
|
||||
|
||||
var notOwnedErr maintenance.NotOwnedError
|
||||
|
||||
err = w.RepoMaintenance(ctx, mOpts)
|
||||
err = w.RepoMaintenance(ctx, nil, mOpts)
|
||||
assert.ErrorAs(t, err, ¬OwnedErr, clues.ToCore(err))
|
||||
}
|
||||
|
||||
@ -252,7 +252,7 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_Force_Succeed
|
||||
}
|
||||
|
||||
// This will set the user.
|
||||
err = w.RepoMaintenance(ctx, mOpts)
|
||||
err = w.RepoMaintenance(ctx, nil, mOpts)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
err = k.Close(ctx)
|
||||
@ -269,13 +269,13 @@ func (suite *BasicKopiaIntegrationSuite) TestMaintenance_WrongUser_Force_Succeed
|
||||
mOpts.Force = true
|
||||
|
||||
// This will set the user.
|
||||
err = w.RepoMaintenance(ctx, mOpts)
|
||||
err = w.RepoMaintenance(ctx, nil, mOpts)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
mOpts.Force = false
|
||||
|
||||
// Running without force should succeed now.
|
||||
err = w.RepoMaintenance(ctx, mOpts)
|
||||
err = w.RepoMaintenance(ctx, nil, mOpts)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
}
|
||||
|
||||
@ -546,7 +546,7 @@ func (suite *RetentionIntegrationSuite) TestSetRetentionParameters_And_Maintenan
|
||||
// This will set common maintenance config parameters. There's some interplay
|
||||
// between the maintenance schedule and retention period that we want to check
|
||||
// below.
|
||||
err = w.RepoMaintenance(ctx, mOpts)
|
||||
err = w.RepoMaintenance(ctx, nil, mOpts)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
// Enable retention.
|
||||
@ -640,6 +640,9 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM
|
||||
|
||||
w := &Wrapper{k}
|
||||
|
||||
ms, err := NewModelStore(k)
|
||||
require.NoError(t, err, "getting model store: %s", clues.ToCore(err))
|
||||
|
||||
mOpts := repository.Maintenance{
|
||||
Safety: repository.FullMaintenanceSafety,
|
||||
Type: repository.CompleteMaintenance,
|
||||
@ -648,7 +651,7 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM
|
||||
// This will set common maintenance config parameters. There's some interplay
|
||||
// between the maintenance schedule and retention period that we want to check
|
||||
// below.
|
||||
err = w.RepoMaintenance(ctx, mOpts)
|
||||
err = w.RepoMaintenance(ctx, ms, mOpts)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
// Enable retention.
|
||||
@ -667,10 +670,15 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM
|
||||
time.Hour*48,
|
||||
assert.True)
|
||||
|
||||
err = ms.Close(ctx)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
if test.reopen {
|
||||
mustReopen(t, ctx, w)
|
||||
}
|
||||
|
||||
ms.c = w.c
|
||||
|
||||
// Change retention duration without updating mode.
|
||||
err = w.SetRetentionParameters(ctx, repository.Retention{
|
||||
Duration: ptr.To(time.Hour * 96),
|
||||
@ -687,7 +695,7 @@ func (suite *RetentionIntegrationSuite) TestSetAndUpdateRetentionParameters_RunM
|
||||
|
||||
// Run full maintenance again. This should extend object locks for things if
|
||||
// they exist.
|
||||
err = w.RepoMaintenance(ctx, mOpts)
|
||||
err = w.RepoMaintenance(ctx, ms, mOpts)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
})
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/control/repository"
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/store"
|
||||
)
|
||||
|
||||
// MaintenanceOperation wraps an operation with restore-specific props.
|
||||
@ -33,17 +34,18 @@ func NewMaintenanceOperation(
|
||||
ctx context.Context,
|
||||
opts control.Options,
|
||||
kw *kopia.Wrapper,
|
||||
storer store.BackupStorer,
|
||||
mOpts repository.Maintenance,
|
||||
bus events.Eventer,
|
||||
) (MaintenanceOperation, error) {
|
||||
op := MaintenanceOperation{
|
||||
operation: newOperation(opts, bus, count.New(), kw, nil),
|
||||
operation: newOperation(opts, bus, count.New(), kw, storer),
|
||||
mOpts: mOpts,
|
||||
}
|
||||
|
||||
// Don't run validation because we don't populate the model store.
|
||||
err := op.validate()
|
||||
|
||||
return op, nil
|
||||
return op, clues.Stack(err).OrNil()
|
||||
}
|
||||
|
||||
func (op *MaintenanceOperation) Run(ctx context.Context) (err error) {
|
||||
@ -83,7 +85,7 @@ func (op *MaintenanceOperation) do(ctx context.Context) error {
|
||||
op.Results.CompletedAt = time.Now()
|
||||
}()
|
||||
|
||||
err := op.operation.kopia.RepoMaintenance(ctx, op.mOpts)
|
||||
err := op.operation.kopia.RepoMaintenance(ctx, op.store, op.mOpts)
|
||||
if err != nil {
|
||||
op.Status = Failed
|
||||
return clues.Wrap(err, "running maintenance operation")
|
||||
|
||||
@ -1,22 +1,60 @@
|
||||
package operations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
dataMock "github.com/alcionai/corso/src/internal/data/mock"
|
||||
evmock "github.com/alcionai/corso/src/internal/events/mock"
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/m365/graph"
|
||||
odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts"
|
||||
"github.com/alcionai/corso/src/internal/m365/support"
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
opMock "github.com/alcionai/corso/src/internal/operations/inject/mock"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/internal/tester/tconfig"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/control/repository"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
selTD "github.com/alcionai/corso/src/pkg/selectors/testdata"
|
||||
storeTD "github.com/alcionai/corso/src/pkg/storage/testdata"
|
||||
"github.com/alcionai/corso/src/pkg/store"
|
||||
)
|
||||
|
||||
func getKopiaHandles(
|
||||
t *testing.T,
|
||||
ctx context.Context, //revive:disable-line:context-as-argument
|
||||
) (*kopia.Wrapper, *kopia.ModelStore) {
|
||||
st := storeTD.NewPrefixedS3Storage(t)
|
||||
k := kopia.NewConn(st)
|
||||
err := k.Initialize(ctx, repository.Options{}, repository.Retention{})
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
kw, err := kopia.NewWrapper(k)
|
||||
// kopiaRef comes with a count of 1 and Wrapper bumps it again so safe
|
||||
// to close here.
|
||||
k.Close(ctx)
|
||||
|
||||
require.NoError(t, err, "getting kopia wrapper: %s", clues.ToCore(err))
|
||||
|
||||
ms, err := kopia.NewModelStore(k)
|
||||
require.NoError(t, err, "getting model store: %s", clues.ToCore(err))
|
||||
|
||||
return kw, ms
|
||||
}
|
||||
|
||||
type MaintenanceOpIntegrationSuite struct {
|
||||
tester.Suite
|
||||
}
|
||||
@ -30,32 +68,21 @@ func TestMaintenanceOpIntegrationSuite(t *testing.T) {
|
||||
}
|
||||
|
||||
func (suite *MaintenanceOpIntegrationSuite) TestRepoMaintenance() {
|
||||
var (
|
||||
t = suite.T()
|
||||
// need to initialize the repository before we can test connecting to it.
|
||||
st = storeTD.NewPrefixedS3Storage(t)
|
||||
k = kopia.NewConn(st)
|
||||
)
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
err := k.Initialize(ctx, repository.Options{}, repository.Retention{})
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
kw, err := kopia.NewWrapper(k)
|
||||
// kopiaRef comes with a count of 1 and Wrapper bumps it again so safe
|
||||
// to close here.
|
||||
k.Close(ctx)
|
||||
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
kw, ms := getKopiaHandles(t, ctx)
|
||||
|
||||
defer kw.Close(ctx)
|
||||
defer ms.Close(ctx)
|
||||
|
||||
mo, err := NewMaintenanceOperation(
|
||||
ctx,
|
||||
control.DefaultOptions(),
|
||||
kw,
|
||||
store.NewWrapper(ms),
|
||||
repository.Maintenance{
|
||||
Type: repository.MetadataMaintenance,
|
||||
},
|
||||
@ -65,3 +92,162 @@ func (suite *MaintenanceOpIntegrationSuite) TestRepoMaintenance() {
|
||||
err = mo.Run(ctx)
|
||||
assert.NoError(t, err, clues.ToCore(err))
|
||||
}
|
||||
|
||||
type MaintenanceOpNightlySuite struct {
|
||||
tester.Suite
|
||||
}
|
||||
|
||||
func TestMaintenanceOpNightlySuite(t *testing.T) {
|
||||
suite.Run(t, &MaintenanceOpNightlySuite{
|
||||
Suite: tester.NewNightlySuite(
|
||||
t,
|
||||
[][]string{storeTD.AWSStorageCredEnvs, tconfig.M365AcctCredEnvs}),
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *MaintenanceOpNightlySuite) TestRepoMaintenance_GarbageCollection() {
|
||||
var (
|
||||
t = suite.T()
|
||||
acct = tconfig.NewM365Account(suite.T())
|
||||
tenantID = acct.Config[account.AzureTenantIDKey]
|
||||
opts = control.DefaultOptions()
|
||||
osel = selectors.NewOneDriveBackup([]string{userID})
|
||||
// Default policy used by SDK clients
|
||||
failurePolicy = control.FailAfterRecovery
|
||||
T1 = time.Now().Truncate(0)
|
||||
)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
osel.Include(selTD.OneDriveBackupFolderScope(osel))
|
||||
|
||||
pathElements := []string{odConsts.DrivesPathDir, "drive-id", odConsts.RootPathDir, folderID}
|
||||
|
||||
tmp, err := path.Build(tenantID, userID, path.OneDriveService, path.FilesCategory, false, pathElements...)
|
||||
require.NoError(suite.T(), err, clues.ToCore(err))
|
||||
|
||||
locPath := path.Builder{}.Append(tmp.Folders()...)
|
||||
|
||||
kw, ms := getKopiaHandles(t, ctx)
|
||||
storer := store.NewWrapper(ms)
|
||||
|
||||
var bupIDs []model.StableID
|
||||
|
||||
// Make two failed backups so the garbage collection code will try to delete
|
||||
// something.
|
||||
for i := 0; i < 2; i++ {
|
||||
suite.Run(fmt.Sprintf("Setup%d", i), func() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
cs := []data.BackupCollection{
|
||||
makeBackupCollection(
|
||||
tmp,
|
||||
locPath,
|
||||
[]dataMock.Item{
|
||||
makeMockItem("file1", nil, T1, false, nil),
|
||||
makeMockItem("file2", nil, T1, false, assert.AnError),
|
||||
}),
|
||||
}
|
||||
|
||||
prefixPath, err := path.BuildMetadata(
|
||||
tenantID,
|
||||
userID,
|
||||
path.OneDriveService,
|
||||
path.FilesCategory,
|
||||
false)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
mc, err := graph.MakeMetadataCollection(
|
||||
prefixPath,
|
||||
makeMetadataCollectionEntries("url/1", driveID, folderID, tmp),
|
||||
func(*support.ControllerOperationStatus) {})
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
cs = append(cs, mc)
|
||||
bp := opMock.NewMockBackupProducer(cs, data.CollectionStats{}, false)
|
||||
|
||||
opts.FailureHandling = failurePolicy
|
||||
|
||||
bo, err := NewBackupOperation(
|
||||
ctx,
|
||||
opts,
|
||||
kw,
|
||||
storer,
|
||||
&bp,
|
||||
acct,
|
||||
osel.Selector,
|
||||
selectors.Selector{DiscreteOwner: userID},
|
||||
evmock.NewBus())
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
err = bo.Run(ctx)
|
||||
assert.Error(t, err, clues.ToCore(err))
|
||||
|
||||
require.NotEmpty(t, bo.Results.BackupID)
|
||||
|
||||
bupIDs = append(bupIDs, bo.Results.BackupID)
|
||||
})
|
||||
}
|
||||
|
||||
// Double check we have two backup models. This is not an exhaustive check but
|
||||
// will give us some comfort that things are working as expected.
|
||||
bups, err := storer.GetBackups(ctx)
|
||||
require.NoError(
|
||||
t,
|
||||
err,
|
||||
"checking backup model existence: %s",
|
||||
clues.ToCore(err))
|
||||
|
||||
var gotBupIDs []model.StableID
|
||||
|
||||
for _, bup := range bups {
|
||||
gotBupIDs = append(gotBupIDs, bup.ID)
|
||||
}
|
||||
|
||||
require.ElementsMatch(t, bupIDs, gotBupIDs)
|
||||
|
||||
// Run maintenance with garbage collection.
|
||||
|
||||
suite.Run("RunMaintenance", func() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
mo, err := NewMaintenanceOperation(
|
||||
ctx,
|
||||
control.DefaultOptions(),
|
||||
kw,
|
||||
store.NewWrapper(ms),
|
||||
repository.Maintenance{
|
||||
Type: repository.CompleteMaintenance,
|
||||
// Set buffer to 0 so things will actually be garbage collected.
|
||||
CleanupBuffer: ptr.To(time.Duration(0)),
|
||||
},
|
||||
evmock.NewBus())
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
err = mo.Run(ctx)
|
||||
assert.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
// Check for backup models again. Only the second one should still be present.
|
||||
bups, err = storer.GetBackups(ctx)
|
||||
require.NoError(
|
||||
t,
|
||||
err,
|
||||
"checking backup model existence after maintenance: %s",
|
||||
clues.ToCore(err))
|
||||
|
||||
gotBupIDs = nil
|
||||
|
||||
for _, bup := range bups {
|
||||
gotBupIDs = append(gotBupIDs, bup.ID)
|
||||
}
|
||||
|
||||
assert.ElementsMatch(t, bupIDs[1:], gotBupIDs)
|
||||
})
|
||||
}
|
||||
|
||||
@ -15,9 +15,10 @@ type Options struct {
|
||||
}
|
||||
|
||||
type Maintenance struct {
|
||||
Type MaintenanceType `json:"type"`
|
||||
Safety MaintenanceSafety `json:"safety"`
|
||||
Force bool `json:"force"`
|
||||
Type MaintenanceType `json:"type"`
|
||||
Safety MaintenanceSafety `json:"safety"`
|
||||
Force bool `json:"force"`
|
||||
CleanupBuffer *time.Duration
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -414,6 +414,7 @@ func (r repository) NewMaintenance(
|
||||
ctx,
|
||||
r.Opts,
|
||||
r.dataLayer,
|
||||
store.NewWrapper(r.modelStore),
|
||||
mOpts,
|
||||
r.Bus)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user