Compare commits
7 Commits
main
...
partial_ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58f5556f1a | ||
|
|
5aedd7e3ed | ||
|
|
52f462abf9 | ||
|
|
748ff803d1 | ||
|
|
d64b8844a5 | ||
|
|
b8a75434c9 | ||
|
|
735ea9fa09 |
@ -452,17 +452,26 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphoreCh }()
|
||||
|
||||
var itf []extensions.CreateItemExtensioner
|
||||
|
||||
if item.GetFile() != nil {
|
||||
itf = []extensions.CreateItemExtensioner{
|
||||
extensions.GetMockExtensions(),
|
||||
}
|
||||
}
|
||||
|
||||
// Read the item
|
||||
oc.populateDriveItem(
|
||||
ctx,
|
||||
parentPath,
|
||||
item,
|
||||
&stats,
|
||||
oc.ctrl.ItemExtensionFactory,
|
||||
itf,
|
||||
errs)
|
||||
|
||||
folderProgress <- struct{}{}
|
||||
}(item)
|
||||
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
@ -32,7 +32,8 @@ const (
|
||||
|
||||
// common tags for filtering
|
||||
const (
|
||||
ServiceTag = "service"
|
||||
ServiceTag = "service"
|
||||
PartialBackupTag = "partialBackup"
|
||||
)
|
||||
|
||||
// Valid returns true if the ModelType value fits within the iota range.
|
||||
|
||||
@ -227,6 +227,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
op.Errors.Fail(clues.Wrap(err, "running backup"))
|
||||
}
|
||||
|
||||
isAtleastPartialBackup := deets != nil && !deets.Empty()
|
||||
|
||||
finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup")
|
||||
LogFaultErrors(ctx, op.Errors.Errors(), "running backup")
|
||||
|
||||
@ -235,7 +237,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
// -----
|
||||
|
||||
err = op.persistResults(startTime, &opStats)
|
||||
if err != nil {
|
||||
if err != nil && !isAtleastPartialBackup {
|
||||
op.Errors.Fail(clues.Wrap(err, "persisting backup results"))
|
||||
return op.Errors.Failure()
|
||||
}
|
||||
@ -250,17 +252,21 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
Infow("completed backup; conditional error forcing exit without model persistence",
|
||||
"results", op.Results)
|
||||
|
||||
return op.Errors.Fail(clues.Wrap(e, "forced backup")).Failure()
|
||||
if !isAtleastPartialBackup {
|
||||
return op.Errors.Fail(clues.Wrap(e, "forced backup")).Failure()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(pandeyabs): Pass a flag to mark the backup as partial
|
||||
err = op.createBackupModels(
|
||||
ctx,
|
||||
sstore,
|
||||
opStats.k.SnapshotID,
|
||||
op.Results.BackupID,
|
||||
op.BackupVersion,
|
||||
deets.Details())
|
||||
deets.Details(),
|
||||
isAtleastPartialBackup)
|
||||
if err != nil {
|
||||
op.Errors.Fail(clues.Wrap(err, "persisting backup"))
|
||||
return op.Errors.Failure()
|
||||
@ -344,7 +350,7 @@ func (op *BackupOperation) do(
|
||||
backupID,
|
||||
op.incremental && canUseMetadata && canUsePreviousBackup,
|
||||
op.Errors)
|
||||
if err != nil {
|
||||
if err != nil && (deets == nil || deets.Empty()) {
|
||||
return nil, clues.Wrap(err, "persisting collection backups")
|
||||
}
|
||||
|
||||
@ -851,6 +857,7 @@ func (op *BackupOperation) createBackupModels(
|
||||
backupID model.StableID,
|
||||
backupVersion int,
|
||||
deets *details.Details,
|
||||
isPartialBackup bool,
|
||||
) error {
|
||||
ctx = clues.Add(ctx, "snapshot_id", snapID, "backup_id", backupID)
|
||||
// generate a new fault bus so that we can maintain clean
|
||||
@ -891,7 +898,8 @@ func (op *BackupOperation) createBackupModels(
|
||||
op.ResourceOwner.Name(),
|
||||
op.Results.ReadWrites,
|
||||
op.Results.StartAndEndTime,
|
||||
op.Errors.Errors())
|
||||
op.Errors.Errors(),
|
||||
isPartialBackup)
|
||||
|
||||
logger.Ctx(ctx).Info("creating new backup")
|
||||
|
||||
|
||||
@ -79,6 +79,7 @@ func New(
|
||||
rw stats.ReadWrites,
|
||||
se stats.StartAndEndTime,
|
||||
fe *fault.Errors,
|
||||
isPartialBackup bool,
|
||||
) *Backup {
|
||||
if fe == nil {
|
||||
fe = &fault.Errors{}
|
||||
@ -115,7 +116,8 @@ func New(
|
||||
BaseModel: model.BaseModel{
|
||||
ID: id,
|
||||
Tags: map[string]string{
|
||||
model.ServiceTag: selector.PathService().String(),
|
||||
model.ServiceTag: selector.PathService().String(),
|
||||
model.PartialBackupTag: strconv.FormatBool(isPartialBackup),
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
@ -401,6 +401,13 @@ func (b *Builder) Details() *Details {
|
||||
return details
|
||||
}
|
||||
|
||||
func (b *Builder) Empty() bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
return len(b.d.Entries) == 0
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------------------
|
||||
// Details
|
||||
// --------------------------------------------------------------------------------
|
||||
|
||||
@ -29,9 +29,14 @@ type MockExtension struct {
|
||||
Ctx context.Context
|
||||
FailOnRead bool
|
||||
FailOnClose bool
|
||||
ItemNumber int32
|
||||
}
|
||||
|
||||
func (me *MockExtension) Read(p []byte) (int, error) {
|
||||
if me.ItemNumber > 1 {
|
||||
return 0, clues.New("mock read error")
|
||||
}
|
||||
|
||||
if me.FailOnRead {
|
||||
return 0, clues.New("mock read error")
|
||||
}
|
||||
@ -55,6 +60,11 @@ func (me *MockExtension) Read(p []byte) (int, error) {
|
||||
}
|
||||
|
||||
func (me *MockExtension) Close() error {
|
||||
// if me.itemCount > 3 {
|
||||
// return clues.New("mock close error")
|
||||
// }
|
||||
|
||||
// atomic.AddInt32(&me.itemCount, 1)
|
||||
if me.FailOnClose {
|
||||
return clues.New("mock close error")
|
||||
}
|
||||
@ -66,6 +76,8 @@ func (me *MockExtension) Close() error {
|
||||
|
||||
me.ExtData.Data[KNumBytes] = me.NumBytes
|
||||
me.ExtData.Data[KCrc32] = me.Crc32
|
||||
me.ExtData.Data["ItemNumber"] = me.ItemNumber
|
||||
|
||||
logger.Ctx(me.Ctx).Infow(
|
||||
"mock extension closed",
|
||||
KNumBytes, me.NumBytes, KCrc32, me.Crc32)
|
||||
@ -77,6 +89,7 @@ type MockItemExtensionFactory struct {
|
||||
FailOnFactoryCreation bool
|
||||
FailOnRead bool
|
||||
FailOnClose bool
|
||||
ItemNumber int32
|
||||
}
|
||||
|
||||
func (m *MockItemExtensionFactory) CreateItemExtension(
|
||||
@ -89,6 +102,8 @@ func (m *MockItemExtensionFactory) CreateItemExtension(
|
||||
return nil, clues.New("factory error")
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Infow("mock extension created", "itemnumber", m.ItemNumber)
|
||||
|
||||
return &MockExtension{
|
||||
Ctx: ctx,
|
||||
InnerRc: rc,
|
||||
@ -96,5 +111,17 @@ func (m *MockItemExtensionFactory) CreateItemExtension(
|
||||
ExtData: extData,
|
||||
FailOnRead: m.FailOnRead,
|
||||
FailOnClose: m.FailOnClose,
|
||||
ItemNumber: m.ItemNumber,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var ctr int32
|
||||
|
||||
func GetMockExtensions() CreateItemExtensioner {
|
||||
atomic.AddInt32(&ctr, 1)
|
||||
mf := &MockItemExtensionFactory{
|
||||
ItemNumber: ctr,
|
||||
}
|
||||
|
||||
return mf
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user