Progress bar improvements (#1302)
## Description This adds the following functionality to the CLI progress output: - Add a message e.g. to describe a completed stage - Add a message describing a stage in progress (using a spinner) and trigger completion - Add a progress tracker (as a counter) for a specified number of items (when the # items is known up front - Improves `ItemProgress` by aligning the columns a bit better and also removing the progress bar and replacing with a byte counter Finally - the above are used in the `backup` and `backup create onedrive` flows. Follow up PRs will wire these up for exchange and the restore flows also. ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🐹 Trivial/Minor ## Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * #1278 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
f58db6c3a7
commit
562c468a91
@ -22,7 +22,7 @@ require (
|
||||
github.com/stretchr/testify v1.8.0
|
||||
github.com/tidwall/pretty v1.2.0
|
||||
github.com/tomlazar/table v0.1.2
|
||||
github.com/vbauerster/mpb/v8 v8.1.2
|
||||
github.com/vbauerster/mpb/v8 v8.1.4
|
||||
go.uber.org/zap v1.23.0
|
||||
golang.org/x/tools v0.1.12
|
||||
gopkg.in/resty.v1 v1.12.0
|
||||
@ -107,7 +107,7 @@ require (
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
||||
golang.org/x/net v0.0.0-20220927171203-f486391704dc // indirect
|
||||
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 // indirect
|
||||
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect
|
||||
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20220927151529-dcaddaf36704 // indirect
|
||||
google.golang.org/grpc v1.49.0 // indirect
|
||||
|
||||
@ -372,8 +372,8 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
|
||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/tomlazar/table v0.1.2 h1:DP8f62FzZAZk8oavepm1v/oyf4ni3/LMHWNlOinmleg=
|
||||
github.com/tomlazar/table v0.1.2/go.mod h1:IecZnpep9f/BatHacfh+++ftE+lFONN8BVPi9nx5U1w=
|
||||
github.com/vbauerster/mpb/v8 v8.1.2 h1:plrnwcqEj+dNce8q1zz0lU9x6DYnjDIdDhqbSqRU7cc=
|
||||
github.com/vbauerster/mpb/v8 v8.1.2/go.mod h1:qn8FMJ7ypuwaDVkgA3rR1ooRzGsneXBjdskm6alegYo=
|
||||
github.com/vbauerster/mpb/v8 v8.1.4 h1:MOcLTIbbAA892wVjRiuFHa1nRlNvifQMDVh12Bq/xIs=
|
||||
github.com/vbauerster/mpb/v8 v8.1.4/go.mod h1:2fRME8lCLU9gwJwghZb1bO9A3Plc8KPeQ/ayGj+Ek4I=
|
||||
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c h1:3lbZUMbMiGUW/LMkfsEABsc5zNT9+b1CvsJx47JzJ8g=
|
||||
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c/go.mod h1:UrdRz5enIKZ63MEE3IF9l2/ebyx59GyGgPi+tICQdmM=
|
||||
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
|
||||
@ -564,8 +564,8 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI=
|
||||
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 h1:OK7RB6t2WQX54srQQYSXMW8dF5C6/8+oA/s5QBmmto4=
|
||||
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/connector/onedrive"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
@ -297,6 +298,12 @@ func (gc *GraphConnector) createCollections(
|
||||
Credentials: gc.credentials,
|
||||
}
|
||||
|
||||
itemCategory := graph.ScopeToPathCategory(qp.Scope)
|
||||
|
||||
foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", itemCategory.String(), user))
|
||||
defer closer()
|
||||
defer close(foldersComplete)
|
||||
|
||||
resolver, err := exchange.PopulateExchangeContainerResolver(
|
||||
ctx,
|
||||
qp,
|
||||
@ -317,6 +324,8 @@ func (gc *GraphConnector) createCollections(
|
||||
return nil, errors.Wrap(err, "filling collections")
|
||||
}
|
||||
|
||||
foldersComplete <- struct{}{}
|
||||
|
||||
for _, collection := range collections {
|
||||
gc.incrementAwaitingMessages()
|
||||
|
||||
|
||||
@ -121,6 +121,14 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
folderProgress, colCloser := observe.ProgressWithCount(
|
||||
observe.ItemQueueMsg,
|
||||
"Folder: /"+parentPathString,
|
||||
int64(len(oc.driveItemIDs)),
|
||||
)
|
||||
defer colCloser()
|
||||
defer close(folderProgress)
|
||||
|
||||
for _, itemID := range oc.driveItemIDs {
|
||||
// Read the item
|
||||
itemInfo, itemData, err := oc.itemReader(ctx, oc.service, oc.driveID, itemID)
|
||||
@ -139,7 +147,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
byteCount += itemInfo.Size
|
||||
|
||||
itemInfo.ParentPath = parentPathString
|
||||
progReader, closer := observe.ItemProgress(itemData, itemInfo.ItemName, itemInfo.Size)
|
||||
progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemInfo.ItemName, itemInfo.Size)
|
||||
|
||||
go closer()
|
||||
|
||||
@ -148,6 +156,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
data: progReader,
|
||||
info: itemInfo,
|
||||
}
|
||||
folderProgress <- struct{}{}
|
||||
}
|
||||
|
||||
oc.reportAsCompleted(ctx, itemsRead, byteCount, errs)
|
||||
|
||||
@ -2,6 +2,7 @@ package onedrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -10,6 +11,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
@ -66,6 +68,8 @@ func (c *Collections) Get(ctx context.Context) ([]data.Collection, error) {
|
||||
}
|
||||
}
|
||||
|
||||
observe.Message(fmt.Sprintf("Discovered %d items to backup", c.numItems))
|
||||
|
||||
collections := make([]data.Collection, 0, len(c.collectionMap))
|
||||
for _, coll := range c.collectionMap {
|
||||
collections = append(collections, coll)
|
||||
@ -172,7 +176,5 @@ func includePath(ctx context.Context, scope selectors.OneDriveScope, folderPath
|
||||
return true
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Infof("Checking path %q", folderPathString)
|
||||
|
||||
return scope.Matches(selectors.OneDriveFolder, folderPathString)
|
||||
}
|
||||
|
||||
@ -252,7 +252,7 @@ func restoreItem(
|
||||
}
|
||||
|
||||
iReader := itemData.ToReader()
|
||||
progReader, closer := observe.ItemProgress(iReader, itemName, ss.Size())
|
||||
progReader, closer := observe.ItemProgress(iReader, observe.ItemRestoreMsg, itemName, ss.Size())
|
||||
|
||||
go closer()
|
||||
|
||||
|
||||
@ -19,7 +19,9 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
wg sync.WaitGroup
|
||||
// TODO: Revisit this being a global nd make it a parameter to the progress methods
|
||||
// so that each bar can be initialized with different contexts if needed.
|
||||
con context.Context
|
||||
writer io.Writer
|
||||
progress *mpb.Progress
|
||||
@ -106,36 +108,159 @@ func Complete() {
|
||||
SeedWriter(con, writer, cfg.doNotDisplay)
|
||||
}
|
||||
|
||||
const (
|
||||
ItemBackupMsg = "Backing up item:"
|
||||
ItemRestoreMsg = "Restoring item:"
|
||||
ItemQueueMsg = "Queuing items:"
|
||||
)
|
||||
|
||||
// Progress Updates
|
||||
|
||||
// Message is used to display a progress message
|
||||
func Message(message string) {
|
||||
if writer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
bar := progress.New(
|
||||
-1,
|
||||
mpb.NopStyle(),
|
||||
mpb.PrependDecorators(
|
||||
decor.Name(message, decor.WC{W: len(message) + 1, C: decor.DidentRight}),
|
||||
),
|
||||
)
|
||||
|
||||
// Complete the bar immediately
|
||||
bar.SetTotal(-1, true)
|
||||
|
||||
waitAndCloseBar(bar)()
|
||||
}
|
||||
|
||||
// MessageWithCompletion is used to display progress with a spinner
|
||||
// that switches to "done" when the completion channel is signalled
|
||||
func MessageWithCompletion(message string) (chan<- struct{}, func()) {
|
||||
completionCh := make(chan struct{}, 1)
|
||||
|
||||
if writer == nil {
|
||||
return completionCh, func() {}
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
frames := []string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"}
|
||||
|
||||
bar := progress.New(
|
||||
-1,
|
||||
mpb.SpinnerStyle(frames...).PositionLeft(),
|
||||
mpb.PrependDecorators(
|
||||
decor.Name(message),
|
||||
),
|
||||
mpb.BarFillerOnComplete("done"),
|
||||
)
|
||||
|
||||
go func(ci <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case <-con.Done():
|
||||
bar.SetTotal(-1, true)
|
||||
case <-ci:
|
||||
// We don't care whether the channel was signalled or closed
|
||||
// Use either one as an indication that the bar is done
|
||||
bar.SetTotal(-1, true)
|
||||
}
|
||||
}
|
||||
}(completionCh)
|
||||
|
||||
return completionCh, waitAndCloseBar(bar)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Progress for Known Quantities
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// ItemProgress tracks the display of an item by counting the bytes
|
||||
// ItemProgress tracks the display of an item in a folder by counting the bytes
|
||||
// read through the provided readcloser, up until the byte count matches
|
||||
// the totalBytes.
|
||||
func ItemProgress(rc io.ReadCloser, iname string, totalBytes int64) (io.ReadCloser, func()) {
|
||||
func ItemProgress(rc io.ReadCloser, header, iname string, totalBytes int64) (io.ReadCloser, func()) {
|
||||
if cfg.doNotDisplay || writer == nil || rc == nil || totalBytes == 0 {
|
||||
return rc, func() {}
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
bar := progress.AddBar(
|
||||
bar := progress.New(
|
||||
totalBytes,
|
||||
mpb.BarFillerOnComplete(""),
|
||||
mpb.NopStyle(),
|
||||
mpb.BarRemoveOnComplete(),
|
||||
mpb.PrependDecorators(
|
||||
decor.OnComplete(decor.NewPercentage("%d", decor.WC{W: 4}), ""),
|
||||
decor.OnComplete(decor.TotalKiloByte("%.1f", decor.WCSyncSpace), ""),
|
||||
),
|
||||
mpb.AppendDecorators(
|
||||
decor.OnComplete(decor.Name(iname), ""),
|
||||
decor.Name(header, decor.WCSyncSpaceR),
|
||||
decor.Name(iname, decor.WCSyncSpaceR),
|
||||
decor.CountersKibiByte(" %.1f/%.1f ", decor.WC{W: 8}),
|
||||
decor.NewPercentage("%d ", decor.WC{W: 4}),
|
||||
),
|
||||
)
|
||||
|
||||
return bar.ProxyReader(rc), waitAndCloseBar(bar)
|
||||
}
|
||||
|
||||
// ProgressWithCount tracks the display of a bar that tracks the completion
|
||||
// of the specified count.
|
||||
// Each write to the provided channel counts as a single increment.
|
||||
// The caller is expected to close the channel.
|
||||
func ProgressWithCount(header, message string, count int64) (chan<- struct{}, func()) {
|
||||
progressCh := make(chan struct{})
|
||||
|
||||
if cfg.doNotDisplay || writer == nil {
|
||||
go func(ci <-chan struct{}) {
|
||||
for {
|
||||
_, ok := <-ci
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}(progressCh)
|
||||
|
||||
return progressCh, func() {}
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
bar := progress.New(
|
||||
count,
|
||||
mpb.NopStyle(),
|
||||
mpb.BarRemoveOnComplete(),
|
||||
mpb.PrependDecorators(
|
||||
decor.Name(header, decor.WCSyncSpaceR),
|
||||
decor.Counters(0, " %d/%d "),
|
||||
decor.Name(message),
|
||||
),
|
||||
)
|
||||
|
||||
ch := make(chan struct{})
|
||||
|
||||
go func(ci <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case <-con.Done():
|
||||
bar.Abort(true)
|
||||
return
|
||||
|
||||
case _, ok := <-ci:
|
||||
if !ok {
|
||||
bar.Abort(true)
|
||||
return
|
||||
}
|
||||
|
||||
bar.Increment()
|
||||
}
|
||||
}
|
||||
}(ch)
|
||||
|
||||
return ch, waitAndCloseBar(bar)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Progress for Unknown Quantities
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -168,7 +293,7 @@ func makeSpinFrames(barWidth int) {
|
||||
spinFrames = sl
|
||||
}
|
||||
|
||||
// ItemProgress tracks the display a spinner that idles while the collection
|
||||
// CollectionProgress tracks the display a spinner that idles while the collection
|
||||
// incrementing the count of items handled. Each write to the provided channel
|
||||
// counts as a single increment. The caller is expected to close the channel.
|
||||
func CollectionProgress(user, category, dirName string) (chan<- struct{}, func()) {
|
||||
@ -192,16 +317,13 @@ func CollectionProgress(user, category, dirName string) (chan<- struct{}, func()
|
||||
bar := progress.New(
|
||||
-1, // -1 to indicate an unbounded count
|
||||
mpb.SpinnerStyle(spinFrames...),
|
||||
mpb.BarFillerOnComplete(""),
|
||||
mpb.BarRemoveOnComplete(),
|
||||
mpb.PrependDecorators(
|
||||
decor.OnComplete(decor.Name(category), ""),
|
||||
decor.Name(category),
|
||||
),
|
||||
mpb.AppendDecorators(
|
||||
decor.OnComplete(decor.CurrentNoUnit("%d - ", decor.WCSyncSpace), ""),
|
||||
decor.OnComplete(
|
||||
decor.Name(fmt.Sprintf("%s - %s", user, dirName)),
|
||||
""),
|
||||
decor.CurrentNoUnit("%d - ", decor.WCSyncSpace),
|
||||
decor.Name(fmt.Sprintf("%s - %s", user, dirName)),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -44,6 +45,7 @@ func (suite *ObserveProgressUnitSuite) TestItemProgress() {
|
||||
from := make([]byte, 100)
|
||||
prog, closer := observe.ItemProgress(
|
||||
io.NopCloser(bytes.NewReader(from)),
|
||||
"folder",
|
||||
"test",
|
||||
100)
|
||||
require.NotNil(t, prog)
|
||||
@ -145,3 +147,179 @@ func (suite *ObserveProgressUnitSuite) TestCollectionProgress_unblockOnChannelCl
|
||||
// blocks, but should resolve due to the cancel
|
||||
closer()
|
||||
}
|
||||
|
||||
func (suite *ObserveProgressUnitSuite) TestObserveProgress() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
recorder := strings.Builder{}
|
||||
observe.SeedWriter(ctx, &recorder, false)
|
||||
|
||||
defer func() {
|
||||
// don't cross-contaminate other tests.
|
||||
//nolint:forbidigo
|
||||
observe.SeedWriter(context.Background(), nil, false)
|
||||
}()
|
||||
|
||||
message := "Test Message"
|
||||
|
||||
observe.Message(message)
|
||||
observe.Complete()
|
||||
require.NotEmpty(suite.T(), recorder.String())
|
||||
require.Contains(suite.T(), recorder.String(), message)
|
||||
}
|
||||
|
||||
func (suite *ObserveProgressUnitSuite) TestObserveProgressWithCompletion() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
recorder := strings.Builder{}
|
||||
observe.SeedWriter(ctx, &recorder, false)
|
||||
|
||||
defer func() {
|
||||
// don't cross-contaminate other tests.
|
||||
//nolint:forbidigo
|
||||
observe.SeedWriter(context.Background(), nil, false)
|
||||
}()
|
||||
|
||||
message := "Test Message"
|
||||
|
||||
ch, closer := observe.MessageWithCompletion(message)
|
||||
|
||||
// Trigger completion
|
||||
ch <- struct{}{}
|
||||
|
||||
// Run the closer - this should complete because the bar was compelted above
|
||||
closer()
|
||||
|
||||
observe.Complete()
|
||||
|
||||
require.NotEmpty(suite.T(), recorder.String())
|
||||
require.Contains(suite.T(), recorder.String(), message)
|
||||
require.Contains(suite.T(), recorder.String(), "done")
|
||||
}
|
||||
|
||||
func (suite *ObserveProgressUnitSuite) TestObserveProgressWithChannelClosed() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
recorder := strings.Builder{}
|
||||
observe.SeedWriter(ctx, &recorder, false)
|
||||
|
||||
defer func() {
|
||||
// don't cross-contaminate other tests.
|
||||
//nolint:forbidigo
|
||||
observe.SeedWriter(context.Background(), nil, false)
|
||||
}()
|
||||
|
||||
message := "Test Message"
|
||||
|
||||
ch, closer := observe.MessageWithCompletion(message)
|
||||
|
||||
// Close channel without completing
|
||||
close(ch)
|
||||
|
||||
// Run the closer - this should complete because the channel was closed above
|
||||
closer()
|
||||
|
||||
observe.Complete()
|
||||
|
||||
require.NotEmpty(suite.T(), recorder.String())
|
||||
require.Contains(suite.T(), recorder.String(), message)
|
||||
require.Contains(suite.T(), recorder.String(), "done")
|
||||
}
|
||||
|
||||
func (suite *ObserveProgressUnitSuite) TestObserveProgressWithContextCancelled() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
recorder := strings.Builder{}
|
||||
observe.SeedWriter(ctx, &recorder, false)
|
||||
|
||||
defer func() {
|
||||
// don't cross-contaminate other tests.
|
||||
//nolint:forbidigo
|
||||
observe.SeedWriter(context.Background(), nil, false)
|
||||
}()
|
||||
|
||||
message := "Test Message"
|
||||
|
||||
_, closer := observe.MessageWithCompletion(message)
|
||||
|
||||
// cancel context
|
||||
cancel()
|
||||
|
||||
// Run the closer - this should complete because the context was closed above
|
||||
closer()
|
||||
|
||||
observe.Complete()
|
||||
|
||||
require.NotEmpty(suite.T(), recorder.String())
|
||||
require.Contains(suite.T(), recorder.String(), message)
|
||||
}
|
||||
|
||||
func (suite *ObserveProgressUnitSuite) TestObserveProgressWithCount() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
recorder := strings.Builder{}
|
||||
observe.SeedWriter(ctx, &recorder, false)
|
||||
|
||||
defer func() {
|
||||
// don't cross-contaminate other tests.
|
||||
//nolint:forbidigo
|
||||
observe.SeedWriter(context.Background(), nil, false)
|
||||
}()
|
||||
|
||||
header := "Header"
|
||||
message := "Test Message"
|
||||
count := 3
|
||||
|
||||
ch, closer := observe.ProgressWithCount(header, message, int64(count))
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
ch <- struct{}{}
|
||||
}
|
||||
|
||||
// Run the closer - this should complete because the context was closed above
|
||||
closer()
|
||||
|
||||
observe.Complete()
|
||||
|
||||
require.NotEmpty(suite.T(), recorder.String())
|
||||
require.Contains(suite.T(), recorder.String(), message)
|
||||
require.Contains(suite.T(), recorder.String(), fmt.Sprintf("%d/%d", count, count))
|
||||
}
|
||||
|
||||
func (suite *ObserveProgressUnitSuite) TestObserveProgressWithCountChannelClosed() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
recorder := strings.Builder{}
|
||||
observe.SeedWriter(ctx, &recorder, false)
|
||||
|
||||
defer func() {
|
||||
// don't cross-contaminate other tests.
|
||||
//nolint:forbidigo
|
||||
observe.SeedWriter(context.Background(), nil, false)
|
||||
}()
|
||||
|
||||
header := "Header"
|
||||
message := "Test Message"
|
||||
count := 3
|
||||
|
||||
ch, closer := observe.ProgressWithCount(header, message, int64(count))
|
||||
|
||||
close(ch)
|
||||
|
||||
// Run the closer - this should complete because the context was closed above
|
||||
closer()
|
||||
|
||||
observe.Complete()
|
||||
|
||||
require.NotEmpty(suite.T(), recorder.String())
|
||||
require.Contains(suite.T(), recorder.String(), message)
|
||||
require.Contains(suite.T(), recorder.String(), fmt.Sprintf("%d/%d", 0, count))
|
||||
}
|
||||
|
||||
@ -122,6 +122,10 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
complete, closer := observe.MessageWithCompletion("Connecting to M365:")
|
||||
defer closer()
|
||||
defer close(complete)
|
||||
|
||||
// retrieve data from the producer
|
||||
gc, err := connector.NewGraphConnector(ctx, op.account)
|
||||
if err != nil {
|
||||
@ -130,6 +134,11 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
|
||||
return err
|
||||
}
|
||||
complete <- struct{}{}
|
||||
|
||||
discoverCh, closer := observe.MessageWithCompletion("Discovering items to backup:")
|
||||
defer closer()
|
||||
defer close(discoverCh)
|
||||
|
||||
cs, err := gc.DataCollections(ctx, op.Selectors)
|
||||
if err != nil {
|
||||
@ -139,8 +148,14 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
discoverCh <- struct{}{}
|
||||
|
||||
opStats.resourceCount = len(data.ResourceOwnerSet(cs))
|
||||
|
||||
backupCh, closer := observe.MessageWithCompletion("Backing up data:")
|
||||
defer closer()
|
||||
defer close(backupCh)
|
||||
|
||||
// hand the results to the consumer
|
||||
opStats.k, backupDetails, err = op.kopia.BackupCollections(ctx, cs, op.Selectors.PathService())
|
||||
if err != nil {
|
||||
@ -149,6 +164,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
|
||||
return err
|
||||
}
|
||||
backupCh <- struct{}{}
|
||||
|
||||
opStats.started = true
|
||||
opStats.gc = gc.AwaitStatus()
|
||||
|
||||
@ -2,6 +2,7 @@ package operations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime/trace"
|
||||
"time"
|
||||
|
||||
@ -15,6 +16,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/events"
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/internal/stats"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
@ -105,6 +107,9 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
||||
)
|
||||
|
||||
defer func() {
|
||||
// wait for the progress display to clean up
|
||||
observe.Complete()
|
||||
|
||||
err = op.persistResults(ctx, startTime, &opStats)
|
||||
if err != nil {
|
||||
return
|
||||
@ -137,7 +142,11 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Infof("Discovered %d items in backup %s to restore", len(paths), op.BackupID)
|
||||
observe.Message(fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID))
|
||||
|
||||
kopiaComplete, closer := observe.MessageWithCompletion("Enumerating items in repository:")
|
||||
defer closer()
|
||||
defer close(kopiaComplete)
|
||||
|
||||
dcs, err := op.kopia.RestoreMultipleItems(ctx, bup.SnapshotID, paths, opStats.bytesRead)
|
||||
if err != nil {
|
||||
@ -146,10 +155,15 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
||||
|
||||
return nil, err
|
||||
}
|
||||
kopiaComplete <- struct{}{}
|
||||
|
||||
opStats.cs = dcs
|
||||
opStats.resourceCount = len(data.ResourceOwnerSet(dcs))
|
||||
|
||||
gcComplete, closer := observe.MessageWithCompletion("Connecting to M365:")
|
||||
defer closer()
|
||||
defer close(gcComplete)
|
||||
|
||||
// restore those collections using graph
|
||||
gc, err := connector.NewGraphConnector(ctx, op.account)
|
||||
if err != nil {
|
||||
@ -158,6 +172,11 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
||||
|
||||
return nil, err
|
||||
}
|
||||
gcComplete <- struct{}{}
|
||||
|
||||
restoreComplete, closer := observe.MessageWithCompletion("Restoring data:")
|
||||
defer closer()
|
||||
defer close(restoreComplete)
|
||||
|
||||
restoreDetails, err = gc.RestoreDataCollections(ctx, op.Selectors, op.Destination, dcs)
|
||||
if err != nil {
|
||||
@ -166,6 +185,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
||||
|
||||
return nil, err
|
||||
}
|
||||
restoreComplete <- struct{}{}
|
||||
|
||||
opStats.started = true
|
||||
opStats.gc = gc.AwaitStatus()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user