Compare commits
7 Commits
main
...
partial_ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58f5556f1a | ||
|
|
5aedd7e3ed | ||
|
|
52f462abf9 | ||
|
|
748ff803d1 | ||
|
|
d64b8844a5 | ||
|
|
b8a75434c9 | ||
|
|
735ea9fa09 |
@ -452,17 +452,26 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer func() { <-semaphoreCh }()
|
defer func() { <-semaphoreCh }()
|
||||||
|
|
||||||
|
var itf []extensions.CreateItemExtensioner
|
||||||
|
|
||||||
|
if item.GetFile() != nil {
|
||||||
|
itf = []extensions.CreateItemExtensioner{
|
||||||
|
extensions.GetMockExtensions(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Read the item
|
// Read the item
|
||||||
oc.populateDriveItem(
|
oc.populateDriveItem(
|
||||||
ctx,
|
ctx,
|
||||||
parentPath,
|
parentPath,
|
||||||
item,
|
item,
|
||||||
&stats,
|
&stats,
|
||||||
oc.ctrl.ItemExtensionFactory,
|
itf,
|
||||||
errs)
|
errs)
|
||||||
|
|
||||||
folderProgress <- struct{}{}
|
folderProgress <- struct{}{}
|
||||||
}(item)
|
}(item)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|||||||
@ -32,7 +32,8 @@ const (
|
|||||||
|
|
||||||
// common tags for filtering
|
// common tags for filtering
|
||||||
const (
|
const (
|
||||||
ServiceTag = "service"
|
ServiceTag = "service"
|
||||||
|
PartialBackupTag = "partialBackup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Valid returns true if the ModelType value fits within the iota range.
|
// Valid returns true if the ModelType value fits within the iota range.
|
||||||
|
|||||||
@ -227,6 +227,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
op.Errors.Fail(clues.Wrap(err, "running backup"))
|
op.Errors.Fail(clues.Wrap(err, "running backup"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
isAtleastPartialBackup := deets != nil && !deets.Empty()
|
||||||
|
|
||||||
finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup")
|
finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup")
|
||||||
LogFaultErrors(ctx, op.Errors.Errors(), "running backup")
|
LogFaultErrors(ctx, op.Errors.Errors(), "running backup")
|
||||||
|
|
||||||
@ -235,7 +237,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
// -----
|
// -----
|
||||||
|
|
||||||
err = op.persistResults(startTime, &opStats)
|
err = op.persistResults(startTime, &opStats)
|
||||||
if err != nil {
|
if err != nil && !isAtleastPartialBackup {
|
||||||
op.Errors.Fail(clues.Wrap(err, "persisting backup results"))
|
op.Errors.Fail(clues.Wrap(err, "persisting backup results"))
|
||||||
return op.Errors.Failure()
|
return op.Errors.Failure()
|
||||||
}
|
}
|
||||||
@ -250,17 +252,21 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
Infow("completed backup; conditional error forcing exit without model persistence",
|
Infow("completed backup; conditional error forcing exit without model persistence",
|
||||||
"results", op.Results)
|
"results", op.Results)
|
||||||
|
|
||||||
return op.Errors.Fail(clues.Wrap(e, "forced backup")).Failure()
|
if !isAtleastPartialBackup {
|
||||||
|
return op.Errors.Fail(clues.Wrap(e, "forced backup")).Failure()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(pandeyabs): Pass a flag to mark the backup as partial
|
||||||
err = op.createBackupModels(
|
err = op.createBackupModels(
|
||||||
ctx,
|
ctx,
|
||||||
sstore,
|
sstore,
|
||||||
opStats.k.SnapshotID,
|
opStats.k.SnapshotID,
|
||||||
op.Results.BackupID,
|
op.Results.BackupID,
|
||||||
op.BackupVersion,
|
op.BackupVersion,
|
||||||
deets.Details())
|
deets.Details(),
|
||||||
|
isAtleastPartialBackup)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
op.Errors.Fail(clues.Wrap(err, "persisting backup"))
|
op.Errors.Fail(clues.Wrap(err, "persisting backup"))
|
||||||
return op.Errors.Failure()
|
return op.Errors.Failure()
|
||||||
@ -344,7 +350,7 @@ func (op *BackupOperation) do(
|
|||||||
backupID,
|
backupID,
|
||||||
op.incremental && canUseMetadata && canUsePreviousBackup,
|
op.incremental && canUseMetadata && canUsePreviousBackup,
|
||||||
op.Errors)
|
op.Errors)
|
||||||
if err != nil {
|
if err != nil && (deets == nil || deets.Empty()) {
|
||||||
return nil, clues.Wrap(err, "persisting collection backups")
|
return nil, clues.Wrap(err, "persisting collection backups")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -851,6 +857,7 @@ func (op *BackupOperation) createBackupModels(
|
|||||||
backupID model.StableID,
|
backupID model.StableID,
|
||||||
backupVersion int,
|
backupVersion int,
|
||||||
deets *details.Details,
|
deets *details.Details,
|
||||||
|
isPartialBackup bool,
|
||||||
) error {
|
) error {
|
||||||
ctx = clues.Add(ctx, "snapshot_id", snapID, "backup_id", backupID)
|
ctx = clues.Add(ctx, "snapshot_id", snapID, "backup_id", backupID)
|
||||||
// generate a new fault bus so that we can maintain clean
|
// generate a new fault bus so that we can maintain clean
|
||||||
@ -891,7 +898,8 @@ func (op *BackupOperation) createBackupModels(
|
|||||||
op.ResourceOwner.Name(),
|
op.ResourceOwner.Name(),
|
||||||
op.Results.ReadWrites,
|
op.Results.ReadWrites,
|
||||||
op.Results.StartAndEndTime,
|
op.Results.StartAndEndTime,
|
||||||
op.Errors.Errors())
|
op.Errors.Errors(),
|
||||||
|
isPartialBackup)
|
||||||
|
|
||||||
logger.Ctx(ctx).Info("creating new backup")
|
logger.Ctx(ctx).Info("creating new backup")
|
||||||
|
|
||||||
|
|||||||
@ -79,6 +79,7 @@ func New(
|
|||||||
rw stats.ReadWrites,
|
rw stats.ReadWrites,
|
||||||
se stats.StartAndEndTime,
|
se stats.StartAndEndTime,
|
||||||
fe *fault.Errors,
|
fe *fault.Errors,
|
||||||
|
isPartialBackup bool,
|
||||||
) *Backup {
|
) *Backup {
|
||||||
if fe == nil {
|
if fe == nil {
|
||||||
fe = &fault.Errors{}
|
fe = &fault.Errors{}
|
||||||
@ -115,7 +116,8 @@ func New(
|
|||||||
BaseModel: model.BaseModel{
|
BaseModel: model.BaseModel{
|
||||||
ID: id,
|
ID: id,
|
||||||
Tags: map[string]string{
|
Tags: map[string]string{
|
||||||
model.ServiceTag: selector.PathService().String(),
|
model.ServiceTag: selector.PathService().String(),
|
||||||
|
model.PartialBackupTag: strconv.FormatBool(isPartialBackup),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|||||||
@ -401,6 +401,13 @@ func (b *Builder) Details() *Details {
|
|||||||
return details
|
return details
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Builder) Empty() bool {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
|
return len(b.d.Entries) == 0
|
||||||
|
}
|
||||||
|
|
||||||
// --------------------------------------------------------------------------------
|
// --------------------------------------------------------------------------------
|
||||||
// Details
|
// Details
|
||||||
// --------------------------------------------------------------------------------
|
// --------------------------------------------------------------------------------
|
||||||
|
|||||||
@ -29,9 +29,14 @@ type MockExtension struct {
|
|||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
FailOnRead bool
|
FailOnRead bool
|
||||||
FailOnClose bool
|
FailOnClose bool
|
||||||
|
ItemNumber int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (me *MockExtension) Read(p []byte) (int, error) {
|
func (me *MockExtension) Read(p []byte) (int, error) {
|
||||||
|
if me.ItemNumber > 1 {
|
||||||
|
return 0, clues.New("mock read error")
|
||||||
|
}
|
||||||
|
|
||||||
if me.FailOnRead {
|
if me.FailOnRead {
|
||||||
return 0, clues.New("mock read error")
|
return 0, clues.New("mock read error")
|
||||||
}
|
}
|
||||||
@ -55,6 +60,11 @@ func (me *MockExtension) Read(p []byte) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (me *MockExtension) Close() error {
|
func (me *MockExtension) Close() error {
|
||||||
|
// if me.itemCount > 3 {
|
||||||
|
// return clues.New("mock close error")
|
||||||
|
// }
|
||||||
|
|
||||||
|
// atomic.AddInt32(&me.itemCount, 1)
|
||||||
if me.FailOnClose {
|
if me.FailOnClose {
|
||||||
return clues.New("mock close error")
|
return clues.New("mock close error")
|
||||||
}
|
}
|
||||||
@ -66,6 +76,8 @@ func (me *MockExtension) Close() error {
|
|||||||
|
|
||||||
me.ExtData.Data[KNumBytes] = me.NumBytes
|
me.ExtData.Data[KNumBytes] = me.NumBytes
|
||||||
me.ExtData.Data[KCrc32] = me.Crc32
|
me.ExtData.Data[KCrc32] = me.Crc32
|
||||||
|
me.ExtData.Data["ItemNumber"] = me.ItemNumber
|
||||||
|
|
||||||
logger.Ctx(me.Ctx).Infow(
|
logger.Ctx(me.Ctx).Infow(
|
||||||
"mock extension closed",
|
"mock extension closed",
|
||||||
KNumBytes, me.NumBytes, KCrc32, me.Crc32)
|
KNumBytes, me.NumBytes, KCrc32, me.Crc32)
|
||||||
@ -77,6 +89,7 @@ type MockItemExtensionFactory struct {
|
|||||||
FailOnFactoryCreation bool
|
FailOnFactoryCreation bool
|
||||||
FailOnRead bool
|
FailOnRead bool
|
||||||
FailOnClose bool
|
FailOnClose bool
|
||||||
|
ItemNumber int32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockItemExtensionFactory) CreateItemExtension(
|
func (m *MockItemExtensionFactory) CreateItemExtension(
|
||||||
@ -89,6 +102,8 @@ func (m *MockItemExtensionFactory) CreateItemExtension(
|
|||||||
return nil, clues.New("factory error")
|
return nil, clues.New("factory error")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Ctx(ctx).Infow("mock extension created", "itemnumber", m.ItemNumber)
|
||||||
|
|
||||||
return &MockExtension{
|
return &MockExtension{
|
||||||
Ctx: ctx,
|
Ctx: ctx,
|
||||||
InnerRc: rc,
|
InnerRc: rc,
|
||||||
@ -96,5 +111,17 @@ func (m *MockItemExtensionFactory) CreateItemExtension(
|
|||||||
ExtData: extData,
|
ExtData: extData,
|
||||||
FailOnRead: m.FailOnRead,
|
FailOnRead: m.FailOnRead,
|
||||||
FailOnClose: m.FailOnClose,
|
FailOnClose: m.FailOnClose,
|
||||||
|
ItemNumber: m.ItemNumber,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ctr int32
|
||||||
|
|
||||||
|
func GetMockExtensions() CreateItemExtensioner {
|
||||||
|
atomic.AddInt32(&ctr, 1)
|
||||||
|
mf := &MockItemExtensionFactory{
|
||||||
|
ItemNumber: ctr,
|
||||||
|
}
|
||||||
|
|
||||||
|
return mf
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user