centralize observe channel listener handling

To ensure that the observe channel handling
doesn't accidentally spawn unkillable routines,
adds a centralized channel listener func for
standard channel management in observe
progress bars.
This commit is contained in:
ryanfkeepers 2023-02-15 11:12:54 -07:00
parent c4e5fc5fc3
commit f137470b03

View File

@ -22,6 +22,9 @@ const (
progressBarWidth = 32 progressBarWidth = 32
) )
// styling
const Bullet = "∙"
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
// TODO: Revisit this being a global nd make it a parameter to the progress methods // TODO: Revisit this being a global nd make it a parameter to the progress methods
@ -168,16 +171,17 @@ func MessageWithCompletion(
ctx context.Context, ctx context.Context,
msg cleanable, msg cleanable,
) (chan<- struct{}, func()) { ) (chan<- struct{}, func()) {
clean := msg.clean() var (
message := msg.String() clean = msg.clean()
message = msg.String()
log = logger.Ctx(ctx)
ch = make(chan struct{}, 1)
)
log := logger.Ctx(ctx)
log.Info(clean) log.Info(clean)
completionCh := make(chan struct{}, 1)
if cfg.hidden() { if cfg.hidden() {
return completionCh, func() { log.Info("done - " + clean) } return ch, func() { log.Info("done - " + clean) }
} }
wg.Add(1) wg.Add(1)
@ -194,27 +198,24 @@ func MessageWithCompletion(
mpb.BarFillerOnComplete("done"), mpb.BarFillerOnComplete("done"),
) )
go func(ci <-chan struct{}) { go listen(
for { ctx,
select { ch,
case <-contxt.Done(): func() {
bar.SetTotal(-1, true) bar.SetTotal(-1, true)
case _, ok := <-ci: bar.Abort(true)
if !ok { },
return func() {
} // We don't care whether the channel was signalled or closed
// We don't care whether the channel was signalled or closed // Use either one as an indication that the bar is done
// Use either one as an indication that the bar is done bar.SetTotal(-1, true)
bar.SetTotal(-1, true) })
}
}
}(completionCh)
wacb := waitAndCloseBar(bar, func() { wacb := waitAndCloseBar(bar, func() {
log.Info("done - " + clean) log.Info("done - " + clean)
}) })
return completionCh, wacb return ch, wacb
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -231,7 +232,9 @@ func ItemProgress(
iname cleanable, iname cleanable,
totalBytes int64, totalBytes int64,
) (io.ReadCloser, func()) { ) (io.ReadCloser, func()) {
log := logger.Ctx(ctx).With("item", iname.clean(), "size", humanize.Bytes(uint64(totalBytes))) log := logger.Ctx(ctx).With(
"item", iname.clean(),
"size", humanize.Bytes(uint64(totalBytes)))
log.Debug(header) log.Debug(header)
if cfg.hidden() || rc == nil || totalBytes == 0 { if cfg.hidden() || rc == nil || totalBytes == 0 {
@ -273,23 +276,17 @@ func ProgressWithCount(
message cleanable, message cleanable,
count int64, count int64,
) (chan<- struct{}, func()) { ) (chan<- struct{}, func()) {
log := logger.Ctx(ctx) var (
lmsg := fmt.Sprintf("%s %s - %d", header, message.clean(), count) log = logger.Ctx(ctx)
lmsg = fmt.Sprintf("%s %s - %d", header, message.clean(), count)
ch = make(chan struct{})
)
log.Info(lmsg) log.Info(lmsg)
progressCh := make(chan struct{})
if cfg.hidden() { if cfg.hidden() {
go func(ci <-chan struct{}) { go listen(ctx, ch, nop, nop)
for { return ch, func() { log.Info("done - " + lmsg) }
_, ok := <-ci
if !ok {
return
}
}
}(progressCh)
return progressCh, func() { log.Info("done - " + lmsg) }
} }
wg.Add(1) wg.Add(1)
@ -308,24 +305,11 @@ func ProgressWithCount(
bar := progress.New(count, mpb.NopStyle(), barOpts...) bar := progress.New(count, mpb.NopStyle(), barOpts...)
ch := make(chan struct{}) go listen(
go func(ci <-chan struct{}) { ctx,
for { ch,
select { func() { bar.Abort(true) },
case <-contxt.Done(): bar.Increment)
bar.Abort(true)
return
case _, ok := <-ci:
if !ok {
bar.Abort(true)
return
}
bar.Increment()
}
}
}(ch)
wacb := waitAndCloseBar(bar, func() { wacb := waitAndCloseBar(bar, func() {
log.Info("done - " + lmsg) log.Info("done - " + lmsg)
@ -374,33 +358,28 @@ func CollectionProgress(
category string, category string,
user, dirName cleanable, user, dirName cleanable,
) (chan<- struct{}, func()) { ) (chan<- struct{}, func()) {
log := logger.Ctx(ctx).With( var (
"user", user.clean(), counted int
"category", category, ch = make(chan struct{})
"dir", dirName.clean()) log = logger.Ctx(ctx).With(
message := "Collecting Directory" "user", user.clean(),
"category", category,
"dir", dirName.clean())
message = "Collecting Directory"
)
log.Info(message) log.Info(message)
incCount := func() {
counted++
// Log every 1000 items that are processed
if counted%1000 == 0 {
log.Infow("uploading", "count", counted)
}
}
if cfg.hidden() || len(user.String()) == 0 || len(dirName.String()) == 0 { if cfg.hidden() || len(user.String()) == 0 || len(dirName.String()) == 0 {
ch := make(chan struct{}) go listen(ctx, ch, nop, incCount)
counted := 0
go func(ci <-chan struct{}) {
for {
_, ok := <-ci
if !ok {
return
}
counted++
// Log every 1000 items that are processed
if counted%1000 == 0 {
log.Infow("uploading", "count", counted)
}
}
}(ch)
return ch, func() { log.Infow("done - "+message, "count", counted) } return ch, func() { log.Infow("done - "+message, "count", counted) }
} }
@ -422,36 +401,16 @@ func CollectionProgress(
bar := progress.New( bar := progress.New(
-1, // -1 to indicate an unbounded count -1, // -1 to indicate an unbounded count
mpb.SpinnerStyle(spinFrames...), mpb.SpinnerStyle(spinFrames...),
barOpts..., barOpts...)
)
var counted int go listen(
ctx,
ch := make(chan struct{}) ch,
go func(ci <-chan struct{}) { func() { bar.SetTotal(-1, true) },
for { func() {
select { incCount()
case <-contxt.Done(): bar.Increment()
bar.SetTotal(-1, true) })
return
case _, ok := <-ci:
if !ok {
bar.SetTotal(-1, true)
return
}
counted++
// Log every 1000 items that are processed
if counted%1000 == 0 {
log.Infow("uploading", "count", counted)
}
bar.Increment()
}
}
}(ch)
wacb := waitAndCloseBar(bar, func() { wacb := waitAndCloseBar(bar, func() {
log.Infow("done - "+message, "count", counted) log.Infow("done - "+message, "count", counted)
@ -472,7 +431,26 @@ func waitAndCloseBar(bar *mpb.Bar, log func()) func() {
// other funcs // other funcs
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const Bullet = "∙" var nop = func() {}
// listen runs a
func listen(ctx context.Context, ci <-chan struct{}, onEnd, onInc func()) {
for {
select {
case <-ctx.Done():
onEnd()
return
case _, ok := <-ci:
if !ok {
onEnd()
return
}
onInc()
}
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// PII redaction // PII redaction