diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index 33467411a..719764d35 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -251,7 +251,7 @@ func createCollections( Credentials: creds, } - foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", qp.Category, user)) + foldersComplete, closer := observe.MessageWithCompletion(ctx, observe.Bulletf("%s - %s", qp.Category, user)) defer closer() defer close(foldersComplete) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index c98854f4a..7fc3faadd 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -195,7 +195,11 @@ func (col *Collection) streamItems(ctx context.Context) { if len(col.added)+len(col.removed) > 0 { var closer func() - colProgress, closer = observe.CollectionProgress(user, col.fullPath.Category().String(), col.fullPath.Folder()) + colProgress, closer = observe.CollectionProgress( + ctx, + user, + col.fullPath.Category().String(), + col.fullPath.Folder()) go closer() @@ -320,7 +324,7 @@ func (col *Collection) finishPopulation(ctx context.Context, success int, totalB }, errs, col.fullPath.Folder()) - logger.Ctx(ctx).Debug(status.String()) + logger.Ctx(ctx).Debugw("done streaming items", "status", status.String()) col.statusUpdater(status) } diff --git a/src/internal/connector/exchange/service_restore.go b/src/internal/connector/exchange/service_restore.go index cdf4541a9..06c56ac9d 100644 --- a/src/internal/connector/exchange/service_restore.go +++ b/src/internal/connector/exchange/service_restore.go @@ -365,7 +365,7 @@ func restoreCollection( user = directory.ResourceOwner() ) - colProgress, closer := observe.CollectionProgress(user, category.String(), directory.Folder()) + colProgress, closer := observe.CollectionProgress(ctx, user, category.String(), directory.Folder()) defer closer() defer close(colProgress) diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 22ac8d746..4ea9ea9eb 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -182,10 +182,10 @@ func (oc *Collection) populateItems(ctx context.Context) { } folderProgress, colCloser := observe.ProgressWithCount( + ctx, observe.ItemQueueMsg, "/"+parentPathString, - int64(len(oc.driveItems)), - ) + int64(len(oc.driveItems))) defer colCloser() defer close(folderProgress) @@ -253,7 +253,7 @@ func (oc *Collection) populateItems(ctx context.Context) { } itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemName, itemSize) + progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize) go closer() return progReader, nil }) @@ -290,6 +290,6 @@ func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byte errs, oc.folderPath.Folder(), // Additional details ) - logger.Ctx(ctx).Debug(status.String()) + logger.Ctx(ctx).Debugw("done streaming items", "status", status.String()) oc.statusUpdater(status) } diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index f7e3d9290..d3528cb02 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -89,7 +89,7 @@ func (c *Collections) Get(ctx context.Context) ([]data.Collection, error) { } } - observe.Message(fmt.Sprintf("Discovered %d items to backup", c.NumItems)) + observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems)) collections := make([]data.Collection, 0, len(c.CollectionMap)) for _, coll := range c.CollectionMap { diff --git a/src/internal/connector/onedrive/restore.go b/src/internal/connector/onedrive/restore.go index 75dd6fa07..31149c7aa 100644 --- a/src/internal/connector/onedrive/restore.go +++ b/src/internal/connector/onedrive/restore.go @@ -99,7 +99,10 @@ func RestoreCollection( restoreFolderElements = append(restoreFolderElements, drivePath.Folders...) trace.Log(ctx, "gc:oneDrive:restoreCollection", directory.String()) - logger.Ctx(ctx).Debugf("Restore target for %s is %v", dc.FullPath(), restoreFolderElements) + logger.Ctx(ctx).Infow( + "restoring to destination", + "origin", dc.FullPath().Folder(), + "destination", restoreFolderElements) // Create restore folders and get the folder ID of the folder the data stream will be restored in restoreFolderID, err := CreateRestoreFolders(ctx, service, drivePath.DriveID, restoreFolderElements) @@ -195,7 +198,11 @@ func CreateRestoreFolders(ctx context.Context, service graph.Servicer, driveID s ) } - logger.Ctx(ctx).Debugf("Resolved %s in %s to %s", folder, parentFolderID, *folderItem.GetId()) + logger.Ctx(ctx).Debugw("resolved restore destination", + "dest_name", folder, + "parent", parentFolderID, + "dest_id", *folderItem.GetId()) + parentFolderID = *folderItem.GetId() } @@ -236,7 +243,7 @@ func restoreItem( } iReader := itemData.ToReader() - progReader, closer := observe.ItemProgress(iReader, observe.ItemRestoreMsg, itemName, ss.Size()) + progReader, closer := observe.ItemProgress(ctx, iReader, observe.ItemRestoreMsg, itemName, ss.Size()) go closer() diff --git a/src/internal/connector/sharepoint/collection.go b/src/internal/connector/sharepoint/collection.go index 14d0beb34..ff6af4132 100644 --- a/src/internal/connector/sharepoint/collection.go +++ b/src/internal/connector/sharepoint/collection.go @@ -156,7 +156,11 @@ func (sc *Collection) populate(ctx context.Context) { ) // TODO: Insert correct ID for CollectionProgress - colProgress, closer := observe.CollectionProgress("name", sc.fullPath.Category().String(), sc.fullPath.Folder()) + colProgress, closer := observe.CollectionProgress( + ctx, + "name", + sc.fullPath.Category().String(), + sc.fullPath.Folder()) go closer() defer func() { diff --git a/src/internal/connector/sharepoint/data_collections.go b/src/internal/connector/sharepoint/data_collections.go index 76b0287b7..d7c6547a0 100644 --- a/src/internal/connector/sharepoint/data_collections.go +++ b/src/internal/connector/sharepoint/data_collections.go @@ -2,7 +2,6 @@ package sharepoint import ( "context" - "fmt" "github.com/pkg/errors" @@ -43,8 +42,8 @@ func DataCollections( ) for _, scope := range b.Scopes() { - foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf( - "∙ %s - %s:", + foldersComplete, closer := observe.MessageWithCompletion(ctx, observe.Bulletf( + "%s - %s", scope.Category().PathType(), site)) defer closer() defer close(foldersComplete) diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index a64715fc3..6a99b9898 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -286,7 +286,6 @@ func collectionEntries( continue } - log.Debugw("reading item", "path", itemPath.String()) trace.Log(ctx, "kopia:streamEntries:item", itemPath.String()) if e.Deleted() { diff --git a/src/internal/observe/observe.go b/src/internal/observe/observe.go index 9dd9a68fa..6d648be5f 100644 --- a/src/internal/observe/observe.go +++ b/src/internal/observe/observe.go @@ -7,10 +7,13 @@ import ( "os" "sync" + "github.com/dustin/go-humanize" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/vbauerster/mpb/v8" "github.com/vbauerster/mpb/v8/decor" + + "github.com/alcionai/corso/src/pkg/logger" ) const ( @@ -127,15 +130,17 @@ func Complete() { } const ( - ItemBackupMsg = "Backing up item:" - ItemRestoreMsg = "Restoring item:" - ItemQueueMsg = "Queuing items:" + ItemBackupMsg = "Backing up item" + ItemRestoreMsg = "Restoring item" + ItemQueueMsg = "Queuing items" ) // Progress Updates // Message is used to display a progress message -func Message(message string) { +func Message(ctx context.Context, message string) { + logger.Ctx(ctx).Info(message) + if cfg.hidden() { return } @@ -153,12 +158,15 @@ func Message(message string) { // Complete the bar immediately bar.SetTotal(-1, true) - waitAndCloseBar(bar)() + waitAndCloseBar(bar, func() {})() } // MessageWithCompletion is used to display progress with a spinner // that switches to "done" when the completion channel is signalled -func MessageWithCompletion(message string) (chan<- struct{}, func()) { +func MessageWithCompletion(ctx context.Context, message string) (chan<- struct{}, func()) { + log := logger.Ctx(ctx) + log.Info(message) + completionCh := make(chan struct{}, 1) if cfg.hidden() { @@ -173,7 +181,7 @@ func MessageWithCompletion(message string) (chan<- struct{}, func()) { -1, mpb.SpinnerStyle(frames...).PositionLeft(), mpb.PrependDecorators( - decor.Name(message), + decor.Name(message+":"), decor.Elapsed(decor.ET_STYLE_GO, decor.WC{W: 8}), ), mpb.BarFillerOnComplete("done"), @@ -192,7 +200,11 @@ func MessageWithCompletion(message string) (chan<- struct{}, func()) { } }(completionCh) - return completionCh, waitAndCloseBar(bar) + wacb := waitAndCloseBar(bar, func() { + log.Info("done - " + message) + }) + + return completionCh, wacb } // --------------------------------------------------------------------------- @@ -202,7 +214,15 @@ func MessageWithCompletion(message string) (chan<- struct{}, func()) { // ItemProgress tracks the display of an item in a folder by counting the bytes // read through the provided readcloser, up until the byte count matches // the totalBytes. -func ItemProgress(rc io.ReadCloser, header, iname string, totalBytes int64) (io.ReadCloser, func()) { +func ItemProgress( + ctx context.Context, + rc io.ReadCloser, + header, iname string, + totalBytes int64, +) (io.ReadCloser, func()) { + log := logger.Ctx(ctx).With("item", iname, "size", humanize.Bytes(uint64(totalBytes))) + log.Debug(header) + if cfg.hidden() || rc == nil || totalBytes == 0 { return rc, func() {} } @@ -224,14 +244,23 @@ func ItemProgress(rc io.ReadCloser, header, iname string, totalBytes int64) (io. bar := progress.New(totalBytes, mpb.NopStyle(), barOpts...) - return bar.ProxyReader(rc), waitAndCloseBar(bar) + wacb := waitAndCloseBar(bar, func() { + // might be overly chatty, we can remove if needed. + log.Debug("done - " + header) + }) + + return bar.ProxyReader(rc), wacb } // ProgressWithCount tracks the display of a bar that tracks the completion // of the specified count. // Each write to the provided channel counts as a single increment. // The caller is expected to close the channel. -func ProgressWithCount(header, message string, count int64) (chan<- struct{}, func()) { +func ProgressWithCount(ctx context.Context, header, message string, count int64) (chan<- struct{}, func()) { + log := logger.Ctx(ctx) + lmsg := fmt.Sprintf("%s %s - %d", header, message, count) + log.Info(lmsg) + progressCh := make(chan struct{}) if cfg.hidden() { @@ -282,7 +311,11 @@ func ProgressWithCount(header, message string, count int64) (chan<- struct{}, fu } }(ch) - return ch, waitAndCloseBar(bar) + wacb := waitAndCloseBar(bar, func() { + log.Info("done - " + lmsg) + }) + + return ch, wacb } // --------------------------------------------------------------------------- @@ -320,7 +353,14 @@ func makeSpinFrames(barWidth int) { // CollectionProgress tracks the display a spinner that idles while the collection // incrementing the count of items handled. Each write to the provided channel // counts as a single increment. The caller is expected to close the channel. -func CollectionProgress(user, category, dirName string) (chan<- struct{}, func()) { +func CollectionProgress( + ctx context.Context, + user, category, dirName string, +) (chan<- struct{}, func()) { + log := logger.Ctx(ctx).With("user", user, "category", category, "dir", dirName) + message := "Collecting " + dirName + log.Info(message) + if cfg.hidden() || len(user) == 0 || len(dirName) == 0 { ch := make(chan struct{}) @@ -357,6 +397,8 @@ func CollectionProgress(user, category, dirName string) (chan<- struct{}, func() barOpts..., ) + var counted int + ch := make(chan struct{}) go func(ci <-chan struct{}) { for { @@ -371,17 +413,34 @@ func CollectionProgress(user, category, dirName string) (chan<- struct{}, func() return } + counted++ + bar.Increment() } } }(ch) - return ch, waitAndCloseBar(bar) + wacb := waitAndCloseBar(bar, func() { + log.Infow("done - "+message, "count", counted) + }) + + return ch, wacb } -func waitAndCloseBar(bar *mpb.Bar) func() { +func waitAndCloseBar(bar *mpb.Bar, log func()) func() { return func() { bar.Wait() wg.Done() + log() } } + +// --------------------------------------------------------------------------- +// other funcs +// --------------------------------------------------------------------------- + +// Bulletf prepends the message with "∙ ", and formats it. +// Ex: Bulletf("%s", "foo") => "∙ foo" +func Bulletf(template string, vs ...any) string { + return fmt.Sprintf("∙ "+template, vs...) +} diff --git a/src/internal/observe/observe_test.go b/src/internal/observe/observe_test.go index 96809a235..681cbeaf5 100644 --- a/src/internal/observe/observe_test.go +++ b/src/internal/observe/observe_test.go @@ -44,6 +44,7 @@ func (suite *ObserveProgressUnitSuite) TestItemProgress() { from := make([]byte, 100) prog, closer := observe.ItemProgress( + ctx, io.NopCloser(bytes.NewReader(from)), "folder", "test", @@ -96,7 +97,7 @@ func (suite *ObserveProgressUnitSuite) TestCollectionProgress_unblockOnCtxCancel observe.SeedWriter(context.Background(), nil, nil) }() - progCh, closer := observe.CollectionProgress("test", "testcat", "testertons") + progCh, closer := observe.CollectionProgress(ctx, "test", "testcat", "testertons") require.NotNil(t, progCh) require.NotNil(t, closer) @@ -131,7 +132,7 @@ func (suite *ObserveProgressUnitSuite) TestCollectionProgress_unblockOnChannelCl observe.SeedWriter(context.Background(), nil, nil) }() - progCh, closer := observe.CollectionProgress("test", "testcat", "testertons") + progCh, closer := observe.CollectionProgress(ctx, "test", "testcat", "testertons") require.NotNil(t, progCh) require.NotNil(t, closer) @@ -163,7 +164,7 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgress() { message := "Test Message" - observe.Message(message) + observe.Message(ctx, message) observe.Complete() require.NotEmpty(suite.T(), recorder.String()) require.Contains(suite.T(), recorder.String(), message) @@ -184,7 +185,7 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithCompletion() { message := "Test Message" - ch, closer := observe.MessageWithCompletion(message) + ch, closer := observe.MessageWithCompletion(ctx, message) // Trigger completion ch <- struct{}{} @@ -214,7 +215,7 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithChannelClosed() { message := "Test Message" - ch, closer := observe.MessageWithCompletion(message) + ch, closer := observe.MessageWithCompletion(ctx, message) // Close channel without completing close(ch) @@ -246,7 +247,7 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithContextCancelled() message := "Test Message" - _, closer := observe.MessageWithCompletion(message) + _, closer := observe.MessageWithCompletion(ctx, message) // cancel context cancel() @@ -277,7 +278,7 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithCount() { message := "Test Message" count := 3 - ch, closer := observe.ProgressWithCount(header, message, int64(count)) + ch, closer := observe.ProgressWithCount(ctx, header, message, int64(count)) for i := 0; i < count; i++ { ch <- struct{}{} @@ -310,7 +311,7 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithCountChannelClosed message := "Test Message" count := 3 - ch, closer := observe.ProgressWithCount(header, message, int64(count)) + ch, closer := observe.ProgressWithCount(ctx, header, message, int64(count)) close(ch) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index f2e68aec5..3a35eb349 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -238,7 +238,7 @@ func produceBackupDataCollections( metadata []data.Collection, ctrlOpts control.Options, ) ([]data.Collection, error) { - complete, closer := observe.MessageWithCompletion("Discovering items to backup:") + complete, closer := observe.MessageWithCompletion(ctx, "Discovering items to backup") defer func() { complete <- struct{}{} close(complete) @@ -492,7 +492,7 @@ func consumeBackupDataCollections( backupID model.StableID, isIncremental bool, ) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) { - complete, closer := observe.MessageWithCompletion("Backing up data:") + complete, closer := observe.MessageWithCompletion(ctx, "Backing up data") defer func() { complete <- struct{}{} close(complete) diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index c068d888e..30770bdf5 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -94,7 +94,7 @@ func connectToM365( sel selectors.Selector, acct account.Account, ) (*connector.GraphConnector, error) { - complete, closer := observe.MessageWithCompletion("Connecting to M365:") + complete, closer := observe.MessageWithCompletion(ctx, "Connecting to M365") defer func() { complete <- struct{}{} close(complete) diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index b4713f57c..f7505fa7d 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -159,9 +159,9 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De return nil, err } - observe.Message(fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID)) + observe.Message(ctx, fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID)) - kopiaComplete, closer := observe.MessageWithCompletion("Enumerating items in repository:") + kopiaComplete, closer := observe.MessageWithCompletion(ctx, "Enumerating items in repository") defer closer() defer close(kopiaComplete) @@ -183,7 +183,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De return nil, opStats.readErr } - restoreComplete, closer := observe.MessageWithCompletion("Restoring data:") + restoreComplete, closer := observe.MessageWithCompletion(ctx, "Restoring data") defer closer() defer close(restoreComplete) diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index e7d2a3c56..f8559759f 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -154,7 +154,7 @@ func Connect( // their output getting clobbered (#1720) defer observe.Complete() - complete, closer := observe.MessageWithCompletion("Connecting to repository:") + complete, closer := observe.MessageWithCompletion(ctx, "Connecting to repository") defer closer() defer close(complete)