From f137470b03f672cba3489844f7e943d0ee7e48f6 Mon Sep 17 00:00:00 2001 From: ryanfkeepers Date: Wed, 15 Feb 2023 11:12:54 -0700 Subject: [PATCH] centralize observe channel listener handling To ensure that the observe channel handling doesn't accidentally spawn unkillable routines, adds a centralized channel listener func for standard channel management in observe progress bars. --- src/internal/observe/observe.go | 196 ++++++++++++++------------------ 1 file changed, 87 insertions(+), 109 deletions(-) diff --git a/src/internal/observe/observe.go b/src/internal/observe/observe.go index 659ce243f..d9b160883 100644 --- a/src/internal/observe/observe.go +++ b/src/internal/observe/observe.go @@ -22,6 +22,9 @@ const ( progressBarWidth = 32 ) +// styling +const Bullet = "∙" + var ( wg sync.WaitGroup // TODO: Revisit this being a global nd make it a parameter to the progress methods @@ -168,16 +171,17 @@ func MessageWithCompletion( ctx context.Context, msg cleanable, ) (chan<- struct{}, func()) { - clean := msg.clean() - message := msg.String() + var ( + clean = msg.clean() + message = msg.String() + log = logger.Ctx(ctx) + ch = make(chan struct{}, 1) + ) - log := logger.Ctx(ctx) log.Info(clean) - completionCh := make(chan struct{}, 1) - if cfg.hidden() { - return completionCh, func() { log.Info("done - " + clean) } + return ch, func() { log.Info("done - " + clean) } } wg.Add(1) @@ -194,27 +198,24 @@ func MessageWithCompletion( mpb.BarFillerOnComplete("done"), ) - go func(ci <-chan struct{}) { - for { - select { - case <-contxt.Done(): - bar.SetTotal(-1, true) - case _, ok := <-ci: - if !ok { - return - } - // We don't care whether the channel was signalled or closed - // Use either one as an indication that the bar is done - bar.SetTotal(-1, true) - } - } - }(completionCh) + go listen( + ctx, + ch, + func() { + bar.SetTotal(-1, true) + bar.Abort(true) + }, + func() { + // We don't care whether the channel was signalled or closed + // Use either one as an indication that the bar is done + bar.SetTotal(-1, true) + }) wacb := waitAndCloseBar(bar, func() { log.Info("done - " + clean) }) - return completionCh, wacb + return ch, wacb } // --------------------------------------------------------------------------- @@ -231,7 +232,9 @@ func ItemProgress( iname cleanable, totalBytes int64, ) (io.ReadCloser, func()) { - log := logger.Ctx(ctx).With("item", iname.clean(), "size", humanize.Bytes(uint64(totalBytes))) + log := logger.Ctx(ctx).With( + "item", iname.clean(), + "size", humanize.Bytes(uint64(totalBytes))) log.Debug(header) if cfg.hidden() || rc == nil || totalBytes == 0 { @@ -273,23 +276,17 @@ func ProgressWithCount( message cleanable, count int64, ) (chan<- struct{}, func()) { - log := logger.Ctx(ctx) - lmsg := fmt.Sprintf("%s %s - %d", header, message.clean(), count) + var ( + log = logger.Ctx(ctx) + lmsg = fmt.Sprintf("%s %s - %d", header, message.clean(), count) + ch = make(chan struct{}) + ) + log.Info(lmsg) - progressCh := make(chan struct{}) - if cfg.hidden() { - go func(ci <-chan struct{}) { - for { - _, ok := <-ci - if !ok { - return - } - } - }(progressCh) - - return progressCh, func() { log.Info("done - " + lmsg) } + go listen(ctx, ch, nop, nop) + return ch, func() { log.Info("done - " + lmsg) } } wg.Add(1) @@ -308,24 +305,11 @@ func ProgressWithCount( bar := progress.New(count, mpb.NopStyle(), barOpts...) - ch := make(chan struct{}) - go func(ci <-chan struct{}) { - for { - select { - case <-contxt.Done(): - bar.Abort(true) - return - - case _, ok := <-ci: - if !ok { - bar.Abort(true) - return - } - - bar.Increment() - } - } - }(ch) + go listen( + ctx, + ch, + func() { bar.Abort(true) }, + bar.Increment) wacb := waitAndCloseBar(bar, func() { log.Info("done - " + lmsg) @@ -374,33 +358,28 @@ func CollectionProgress( category string, user, dirName cleanable, ) (chan<- struct{}, func()) { - log := logger.Ctx(ctx).With( - "user", user.clean(), - "category", category, - "dir", dirName.clean()) - message := "Collecting Directory" + var ( + counted int + ch = make(chan struct{}) + log = logger.Ctx(ctx).With( + "user", user.clean(), + "category", category, + "dir", dirName.clean()) + message = "Collecting Directory" + ) + log.Info(message) + incCount := func() { + counted++ + // Log every 1000 items that are processed + if counted%1000 == 0 { + log.Infow("uploading", "count", counted) + } + } + if cfg.hidden() || len(user.String()) == 0 || len(dirName.String()) == 0 { - ch := make(chan struct{}) - - counted := 0 - - go func(ci <-chan struct{}) { - for { - _, ok := <-ci - if !ok { - return - } - counted++ - - // Log every 1000 items that are processed - if counted%1000 == 0 { - log.Infow("uploading", "count", counted) - } - } - }(ch) - + go listen(ctx, ch, nop, incCount) return ch, func() { log.Infow("done - "+message, "count", counted) } } @@ -422,36 +401,16 @@ func CollectionProgress( bar := progress.New( -1, // -1 to indicate an unbounded count mpb.SpinnerStyle(spinFrames...), - barOpts..., - ) + barOpts...) - var counted int - - ch := make(chan struct{}) - go func(ci <-chan struct{}) { - for { - select { - case <-contxt.Done(): - bar.SetTotal(-1, true) - return - - case _, ok := <-ci: - if !ok { - bar.SetTotal(-1, true) - return - } - - counted++ - - // Log every 1000 items that are processed - if counted%1000 == 0 { - log.Infow("uploading", "count", counted) - } - - bar.Increment() - } - } - }(ch) + go listen( + ctx, + ch, + func() { bar.SetTotal(-1, true) }, + func() { + incCount() + bar.Increment() + }) wacb := waitAndCloseBar(bar, func() { log.Infow("done - "+message, "count", counted) @@ -472,7 +431,26 @@ func waitAndCloseBar(bar *mpb.Bar, log func()) func() { // other funcs // --------------------------------------------------------------------------- -const Bullet = "∙" +var nop = func() {} + +// listen runs a +func listen(ctx context.Context, ci <-chan struct{}, onEnd, onInc func()) { + for { + select { + case <-ctx.Done(): + onEnd() + return + + case _, ok := <-ci: + if !ok { + onEnd() + return + } + + onInc() + } + } +} // --------------------------------------------------------------------------- // PII redaction