From 279a6dab3cb20395f61f7cf30190d4740506c34a Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 11 Jan 2024 15:42:10 -0700 Subject: [PATCH] normalize observe usage, ensure deferred closers (#5006) some housecleaning before reviewing deadlock issues --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :broom: Tech Debt/Cleanup #### Test Plan - [x] :zap: Unit test --- src/cli/export/export.go | 39 ++++++++++------- .../m365/collection/drive/collection.go | 14 +++---- src/internal/m365/collection/drive/restore.go | 17 ++++---- .../m365/collection/exchange/backup.go | 14 +++---- .../m365/collection/exchange/collection.go | 39 +++++++++-------- .../m365/collection/exchange/restore.go | 6 +-- .../m365/collection/groups/collection.go | 42 +++++++++---------- src/internal/m365/collection/site/backup.go | 14 ++++--- .../m365/collection/site/collection.go | 28 ++++++++----- src/internal/m365/service/groups/backup.go | 32 +++++++------- src/internal/m365/service/onedrive/backup.go | 15 +++---- src/internal/observe/observe.go | 35 ++++++++++------ src/internal/observe/observe_test.go | 3 +- src/internal/operations/backup.go | 8 ++-- src/internal/operations/export.go | 13 ++---- src/internal/operations/restore.go | 8 ++-- src/pkg/export/consume.go | 4 +- src/pkg/repository/data_providers.go | 4 +- src/pkg/repository/repository.go | 8 ++-- 19 files changed, 182 insertions(+), 161 deletions(-) diff --git a/src/cli/export/export.go b/src/cli/export/export.go index e618aeb2d..1b5a40d69 100644 --- a/src/cli/export/export.go +++ b/src/cli/export/export.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/internal/operations" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/dttm" "github.com/alcionai/corso/src/pkg/export" @@ -96,7 +97,7 @@ func runExport( return Only(ctx, clues.Wrap(err, "Failed to initialize "+serviceName+" export")) } - expColl, err := eo.Run(ctx) + collections, err := eo.Run(ctx) if err != nil { if errors.Is(err, data.ErrNotFound) { return Only(ctx, clues.New("Backup or backup details missing for id "+backupID)) @@ -105,20 +106,8 @@ func runExport( return Only(ctx, clues.Wrap(err, "Failed to run "+serviceName+" export")) } - // It would be better to give a progressbar than a spinner, but we - // have any way of knowing how many files are available as of now. - diskWriteComplete := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Writing data to disk") - - err = export.ConsumeExportCollections(ctx, exportLocation, expColl, eo.Errors) - - // The progressbar has to be closed before we move on as the Infof - // below flushes progressbar to prevent clobbering the output and - // that causes the entire export operation to stall indefinitely. - // https://github.com/alcionai/corso/blob/8102523dc62c001b301cd2ab4e799f86146ab1a0/src/cli/print/print.go#L151 - close(diskWriteComplete) - - if err != nil { - return Only(ctx, err) + if err = showExportProgress(ctx, eo, collections, exportLocation); err != nil { + return err } if len(eo.Errors.Recovered()) > 0 { @@ -142,3 +131,23 @@ func runExport( return nil } + +// slim wrapper that allows us to defer the progress bar closure with the expected scope. +func showExportProgress( + ctx context.Context, + op operations.ExportOperation, + collections []export.Collectioner, + exportLocation string, +) error { + // It would be better to give a progressbar than a spinner, but we + // have any way of knowing how many files are available as of now. + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Writing data to disk") + defer close(progressMessage) + + err := export.ConsumeExportCollections(ctx, exportLocation, collections, op.Errors) + if err != nil { + return Only(ctx, err) + } + + return nil +} diff --git a/src/internal/m365/collection/drive/collection.go b/src/internal/m365/collection/drive/collection.go index 0c578d6ad..27f01ce44 100644 --- a/src/internal/m365/collection/drive/collection.go +++ b/src/internal/m365/collection/drive/collection.go @@ -462,14 +462,12 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) { return } - displayPath := oc.handler.FormatDisplayPath(oc.driveName, parentPath) - - folderProgress := observe.ProgressWithCount( + progressMessage := observe.ProgressWithCount( ctx, observe.ItemQueueMsg, - path.NewElements(displayPath), + path.NewElements(oc.handler.FormatDisplayPath(oc.driveName, parentPath)), int64(len(oc.driveItems))) - defer close(folderProgress) + defer close(progressMessage) semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).Item()) defer close(semaphoreCh) @@ -500,7 +498,7 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) { oc.ctrl.ItemExtensionFactory, errs) - folderProgress <- struct{}{} + progressMessage <- struct{}{} }(item) } @@ -546,7 +544,7 @@ func (lig *lazyItemGetter) GetData( lig.info.Extension.Data = extData.Data // display/log the item download - progReader, _ := observe.ItemProgress( + progReader := observe.ItemProgress( ctx, extRc, observe.ItemBackupMsg, @@ -656,7 +654,7 @@ func (oc *Collection) streamDriveItem( } metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - progReader, _ := observe.ItemProgress( + progReader := observe.ItemProgress( ctx, itemMeta, observe.ItemBackupMsg, diff --git a/src/internal/m365/collection/drive/restore.go b/src/internal/m365/collection/drive/restore.go index f5a18a091..8f7059491 100644 --- a/src/internal/m365/collection/drive/restore.go +++ b/src/internal/m365/collection/drive/restore.go @@ -795,9 +795,8 @@ func restoreFile( } var ( - written int64 - progReader io.ReadCloser - closeProgressBar func() + written int64 + progressReader io.ReadCloser ) // This is just to retry file upload, the uploadSession creation is @@ -824,21 +823,23 @@ func restoreFile( iReader = itemData.ToReader() } - progReader, closeProgressBar = observe.ItemProgress( + progressReader = observe.ItemProgress( ctx, iReader, observe.ItemRestoreMsg, clues.Hide(pname), ss.Size()) + defer progressReader.Close() // Upload the stream data - written, err = io.CopyBuffer(w, progReader, copyBuffer) + written, err = io.CopyBuffer(w, progressReader, copyBuffer) if err == nil { break } - // clear out the progress bar immediately on error - closeProgressBar() + // close the progress bar immediately on error, else we might deadlock + // its safe to double call Close on the reader. + progressReader.Close() // refresh the io.Writer to restart the upload // TODO: @vkamra verify if var session is the desired input @@ -853,8 +854,6 @@ func restoreFile( return "", details.ItemInfo{}, clues.Wrap(err, "uploading file") } - defer closeProgressBar() - dii := ir.AugmentItemInfo( details.ItemInfo{}, rcc.ProtectedResource, diff --git a/src/internal/m365/collection/exchange/backup.go b/src/internal/m365/collection/exchange/backup.go index ccb83bf7b..240e2491b 100644 --- a/src/internal/m365/collection/exchange/backup.go +++ b/src/internal/m365/collection/exchange/backup.go @@ -60,16 +60,14 @@ func CreateCollections( return nil, clues.NewWC(ctx, "unsupported backup category type") } - pcfg := observe.ProgressCfg{ - Indent: 1, - CompletionMessage: func() string { return fmt.Sprintf("(found %d folders)", len(collections)) }, - } - foldersComplete := observe.MessageWithCompletion( + progressMessage := observe.MessageWithCompletion( ctx, - pcfg, + observe.ProgressCfg{ + Indent: 1, + CompletionMessage: func() string { return fmt.Sprintf("(found %d folders)", len(collections)) }, + }, qp.Category.HumanString()) - - defer close(foldersComplete) + defer close(progressMessage) rootFolder, cc := handler.NewContainerCache(bpc.ProtectedResource.ID()) diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index 631119e5b..b05764219 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -188,12 +188,11 @@ func (col *prefetchCollection) streamItems( errs *fault.Bus, ) { var ( - success int64 - totalBytes int64 - wg sync.WaitGroup - colProgress chan<- struct{} - - user = col.user + success int64 + totalBytes int64 + wg sync.WaitGroup + progressMessage chan<- struct{} + user = col.user ) ctx = clues.Add( @@ -216,11 +215,11 @@ func (col *prefetchCollection) streamItems( }() if len(col.added)+len(col.removed) > 0 { - colProgress = observe.CollectionProgress( + progressMessage = observe.CollectionProgress( ctx, col.Category().HumanString(), col.LocationPath().Elements()) - defer close(colProgress) + defer close(progressMessage) } semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch) @@ -244,8 +243,8 @@ func (col *prefetchCollection) streamItems( atomic.AddInt64(&success, 1) - if colProgress != nil { - colProgress <- struct{}{} + if progressMessage != nil { + progressMessage <- struct{}{} } }(id) } @@ -333,8 +332,8 @@ func (col *prefetchCollection) streamItems( atomic.AddInt64(&success, 1) atomic.AddInt64(&totalBytes, info.Size) - if colProgress != nil { - colProgress <- struct{}{} + if progressMessage != nil { + progressMessage <- struct{}{} } }(id) } @@ -389,8 +388,8 @@ func (col *lazyFetchCollection) streamItems( errs *fault.Bus, ) { var ( - success int64 - colProgress chan<- struct{} + success int64 + progressMessage chan<- struct{} user = col.user ) @@ -408,11 +407,11 @@ func (col *lazyFetchCollection) streamItems( }() if len(col.added)+len(col.removed) > 0 { - colProgress = observe.CollectionProgress( + progressMessage = observe.CollectionProgress( ctx, col.Category().HumanString(), col.LocationPath().Elements()) - defer close(colProgress) + defer close(progressMessage) } // delete all removed items @@ -421,8 +420,8 @@ func (col *lazyFetchCollection) streamItems( atomic.AddInt64(&success, 1) - if colProgress != nil { - colProgress <- struct{}{} + if progressMessage != nil { + progressMessage <- struct{}{} } } @@ -458,8 +457,8 @@ func (col *lazyFetchCollection) streamItems( atomic.AddInt64(&success, 1) - if colProgress != nil { - colProgress <- struct{}{} + if progressMessage != nil { + progressMessage <- struct{}{} } } } diff --git a/src/internal/m365/collection/exchange/restore.go b/src/internal/m365/collection/exchange/restore.go index 038c98ae7..dd4bb372b 100644 --- a/src/internal/m365/collection/exchange/restore.go +++ b/src/internal/m365/collection/exchange/restore.go @@ -47,11 +47,11 @@ func RestoreCollection( category = fullPath.Category() ) - colProgress := observe.CollectionProgress( + progressMessage := observe.CollectionProgress( ctx, category.HumanString(), fullPath.Folder(false)) - defer close(colProgress) + defer close(progressMessage) for { select { @@ -119,7 +119,7 @@ func RestoreCollection( logger.Ctx(ictx).Infow("accounting for restored item", "error", err) } - colProgress <- struct{}{} + progressMessage <- struct{}{} } } } diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index 3bd23930e..9c3914b18 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -128,11 +128,11 @@ func (col *prefetchCollection[C, I]) Items(ctx context.Context, errs *fault.Bus) func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) { var ( - streamedItems int64 - totalBytes int64 - wg sync.WaitGroup - colProgress chan<- struct{} - el = errs.Local() + streamedItems int64 + totalBytes int64 + wg sync.WaitGroup + progressMessage chan<- struct{} + el = errs.Local() ) ctx = clues.Add(ctx, "category", col.Category().String()) @@ -154,11 +154,11 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul }() if len(col.added)+len(col.removed) > 0 { - colProgress = observe.CollectionProgress( + progressMessage = observe.CollectionProgress( ctx, col.Category().HumanString(), col.LocationPath().Elements()) - defer close(colProgress) + defer close(progressMessage) } semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch) @@ -179,8 +179,8 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul atomic.AddInt64(&streamedItems, 1) col.Counter.Inc(count.StreamItemsRemoved) - if colProgress != nil { - colProgress <- struct{}{} + if progressMessage != nil { + progressMessage <- struct{}{} } }(id) } @@ -254,8 +254,8 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul col.Counter.Add(count.StreamBytesAdded, info.Size) - if colProgress != nil { - colProgress <- struct{}{} + if progressMessage != nil { + progressMessage <- struct{}{} } }(id) } @@ -294,10 +294,10 @@ func (col *lazyFetchCollection[C, I]) Items( func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) { var ( - streamedItems int64 - wg sync.WaitGroup - colProgress chan<- struct{} - el = errs.Local() + streamedItems int64 + wg sync.WaitGroup + progressMessage chan<- struct{} + el = errs.Local() ) ctx = clues.Add(ctx, "category", col.Category().String()) @@ -319,11 +319,11 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau }() if len(col.added)+len(col.removed) > 0 { - colProgress = observe.CollectionProgress( + progressMessage = observe.CollectionProgress( ctx, col.Category().HumanString(), col.LocationPath().Elements()) - defer close(colProgress) + defer close(progressMessage) } semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch) @@ -344,8 +344,8 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau atomic.AddInt64(&streamedItems, 1) col.Counter.Inc(count.StreamItemsRemoved) - if colProgress != nil { - colProgress <- struct{}{} + if progressMessage != nil { + progressMessage <- struct{}{} } }(id) } @@ -386,8 +386,8 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau atomic.AddInt64(&streamedItems, 1) - if colProgress != nil { - colProgress <- struct{}{} + if progressMessage != nil { + progressMessage <- struct{}{} } }(id, modTime) } diff --git a/src/internal/m365/collection/site/backup.go b/src/internal/m365/collection/site/backup.go index 0a09f0be7..9b7a03ee1 100644 --- a/src/internal/m365/collection/site/backup.go +++ b/src/internal/m365/collection/site/backup.go @@ -58,12 +58,14 @@ func CollectLibraries( path.LibrariesCategory.HumanString(), stdpath.Base(bpc.ProtectedResource.Name())) - pcfg := observe.ProgressCfg{ - Indent: 1, - CompletionMessage: func() string { return fmt.Sprintf("(found %d items)", colls.NumItems) }, - } - progressBar := observe.MessageWithCompletion(ctx, pcfg, msg) - close(progressBar) + progressMessage := observe.MessageWithCompletion( + ctx, + observe.ProgressCfg{ + Indent: 1, + CompletionMessage: func() string { return fmt.Sprintf("(found %d items)", colls.NumItems) }, + }, + msg) + close(progressMessage) odcs, canUsePreviousBackup, err := colls.Get(ctx, bpc.MetadataCollections, ssmb, errs) if err != nil { diff --git a/src/internal/m365/collection/site/collection.go b/src/internal/m365/collection/site/collection.go index 1bfaf8979..39e5fd27c 100644 --- a/src/internal/m365/collection/site/collection.go +++ b/src/internal/m365/collection/site/collection.go @@ -184,9 +184,11 @@ func (pc *prefetchCollection) streamLists( pc.fullPath, &metrics) - // TODO: Insert correct ID for CollectionProgress - progress := observe.CollectionProgress(ctx, pc.fullPath.Category().HumanString(), pc.fullPath.Folders()) - defer close(progress) + progressMessage := observe.CollectionProgress( + ctx, + pc.fullPath.Category().HumanString(), + pc.fullPath.Folders()) + defer close(progressMessage) semaphoreCh := make(chan struct{}, fetchChannelSize) defer close(semaphoreCh) @@ -204,7 +206,7 @@ func (pc *prefetchCollection) streamLists( go pc.handleListItems( ctx, semaphoreCh, - progress, + progressMessage, &wg, listID, &objects, @@ -238,8 +240,11 @@ func (pc *prefetchCollection) streamPages( &metrics) // TODO: Insert correct ID for CollectionProgress - progress := observe.CollectionProgress(ctx, pc.fullPath.Category().HumanString(), pc.fullPath.Folders()) - defer close(progress) + progressMessage := observe.CollectionProgress( + ctx, + pc.fullPath.Category().HumanString(), + pc.fullPath.Folders()) + defer close(progressMessage) betaService := pc.betaService if betaService == nil { @@ -299,7 +304,7 @@ func (pc *prefetchCollection) streamPages( } pc.stream[path.PagesCategory] <- item - progress <- struct{}{} + progressMessage <- struct{}{} } } @@ -453,8 +458,11 @@ func (lc *lazyFetchCollection) streamItems( lc.fullPath, &metrics) - progress := observe.CollectionProgress(ctx, lc.fullPath.Category().HumanString(), lc.fullPath.Folders()) - defer close(progress) + progressMessage := observe.CollectionProgress( + ctx, + lc.fullPath.Category().HumanString(), + lc.fullPath.Folders()) + defer close(progressMessage) for listID, modTime := range lc.items { if el.Failure() != nil { @@ -475,7 +483,7 @@ func (lc *lazyFetchCollection) streamItems( metrics.Successes++ - progress <- struct{}{} + progressMessage <- struct{}{} } metrics.Objects += int(numLists) diff --git a/src/internal/m365/service/groups/backup.go b/src/internal/m365/service/groups/backup.go index f930625c3..4dcae200a 100644 --- a/src/internal/m365/service/groups/backup.go +++ b/src/internal/m365/service/groups/backup.go @@ -264,14 +264,15 @@ func backupChannels( canUsePreviousBackup bool ) - pcfg := observe.ProgressCfg{ - Indent: 1, - // TODO(meain): Use number of messages and not channels - CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(colls)) }, - } - progressBar := observe.MessageWithCompletion(ctx, pcfg, scope.Category().PathType().HumanString()) - - defer close(progressBar) + progressMessage := observe.MessageWithCompletion( + ctx, + observe.ProgressCfg{ + Indent: 1, + // TODO(meain): Use number of messages and not channels + CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(colls)) }, + }, + scope.Category().PathType().HumanString()) + defer close(progressMessage) if !api.IsTeam(ctx, bc.group) { return colls, nil @@ -325,13 +326,14 @@ func backupConversations( colls []data.BackupCollection ) - pcfg := observe.ProgressCfg{ - Indent: 1, - CompletionMessage: func() string { return fmt.Sprintf("(found %d conversations)", len(colls)) }, - } - progressBar := observe.MessageWithCompletion(ctx, pcfg, scope.Category().PathType().HumanString()) - - defer close(progressBar) + progressMessage := observe.MessageWithCompletion( + ctx, + observe.ProgressCfg{ + Indent: 1, + CompletionMessage: func() string { return fmt.Sprintf("(found %d conversations)", len(colls)) }, + }, + scope.Category().PathType().HumanString()) + defer close(progressMessage) useLazyReader := !bc.producerConfig.Options.ToggleFeatures.DisableLazyItemReader diff --git a/src/internal/m365/service/onedrive/backup.go b/src/internal/m365/service/onedrive/backup.go index e373a66b2..0c19020ed 100644 --- a/src/internal/m365/service/onedrive/backup.go +++ b/src/internal/m365/service/onedrive/backup.go @@ -62,13 +62,14 @@ func ProduceBackupCollections( bpc.Options, counter) - pcfg := observe.ProgressCfg{ - Indent: 1, - CompletionMessage: func() string { return fmt.Sprintf("(found %d files)", nc.NumFiles) }, - } - progressBar := observe.MessageWithCompletion(ctx, pcfg, path.FilesCategory.HumanString()) - - defer close(progressBar) + progressMessage := observe.MessageWithCompletion( + ctx, + observe.ProgressCfg{ + Indent: 1, + CompletionMessage: func() string { return fmt.Sprintf("(found %d files)", nc.NumFiles) }, + }, + path.FilesCategory.HumanString()) + defer close(progressMessage) odcs, canUsePreviousBackup, err = nc.Get(ctx, bpc.MetadataCollections, ssmb, errs) if err != nil { diff --git a/src/internal/observe/observe.go b/src/internal/observe/observe.go index f8da6220d..f69deaffe 100644 --- a/src/internal/observe/observe.go +++ b/src/internal/observe/observe.go @@ -178,6 +178,10 @@ type ProgressCfg struct { CompletionMessage func() string } +func DefaultCfg() ProgressCfg { + return ProgressCfg{} +} + // Message is used to display a progress message func Message(ctx context.Context, cfg ProgressCfg, msgs ...any) { var ( @@ -351,26 +355,29 @@ func (ac *autoCloser) Read(p []byte) (n int, err error) { } func (ac *autoCloser) Close() error { - if !ac.closed { - ac.closed = true - ac.close() + if ac.closed { + return nil } + ac.closed = true + ac.close() + return ac.rc.Close() } // ItemProgress tracks the display of an item in a folder by counting the bytes // read through the provided readcloser, up until the byte count matches // the totalBytes. +// // The progress bar will close automatically when the reader closes. If an early -// close is needed due to abort or other issue, the returned func can be used. +// close is needed due to abort or other issue, the reader can be closed manually. func ItemProgress( ctx context.Context, rc io.ReadCloser, header string, iname any, totalBytes int64, -) (io.ReadCloser, func()) { +) io.ReadCloser { var ( obs = getObserver(ctx) plain = plainString(iname) @@ -383,7 +390,7 @@ func ItemProgress( if obs.hidden() || rc == nil { defer log.Debug("done - " + header) - return rc, func() {} + return rc } obs.wg.Add(1) @@ -410,22 +417,24 @@ func ItemProgress( closer := &autoCloser{rc: bar.ProxyReader(rc)} closer.close = func() { - closer.closed = true bar.SetTotal(-1, true) bar.Abort(true) } - return closer, closer.close + return closer } // ItemSpinner is similar to ItemProgress, but for use in cases where // we don't know the file size but want to show progress. +// +// The progress bar will close automatically when the reader closes. If an early +// close is needed due to abort or other issue, the reader can be closed manually. func ItemSpinner( ctx context.Context, rc io.ReadCloser, header string, iname any, -) (io.ReadCloser, func()) { +) io.ReadCloser { var ( obs = getObserver(ctx) plain = plainString(iname) @@ -436,7 +445,7 @@ func ItemSpinner( if obs.hidden() || rc == nil { defer log.Debug("done - " + header) - return rc, func() {} + return rc } obs.wg.Add(1) @@ -459,12 +468,14 @@ func ItemSpinner( log.Debug("done - " + header) })() - abort := func() { + closer := &autoCloser{rc: bar.ProxyReader(rc)} + + closer.close = func() { bar.SetTotal(-1, true) bar.Abort(true) } - return bar.ProxyReader(rc), abort + return closer } // ProgressWithCount tracks the display of a bar that tracks the completion diff --git a/src/internal/observe/observe_test.go b/src/internal/observe/observe_test.go index 533b0d760..42fcc6dc9 100644 --- a/src/internal/observe/observe_test.go +++ b/src/internal/observe/observe_test.go @@ -45,14 +45,13 @@ func (suite *ObserveProgressUnitSuite) TestObserve_ItemProgress() { ctx = SeedObserver(ctx, &recorder, config{}) from := make([]byte, 100) - prog, abort := ItemProgress( + prog := ItemProgress( ctx, io.NopCloser(bytes.NewReader(from)), "folder", tst, 100) require.NotNil(t, prog) - require.NotNil(t, abort) var i int diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index c91c3b43b..a4a1508a6 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -522,8 +522,8 @@ func produceBackupDataCollections( counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) { - progressBar := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Discovering items to backup") - defer close(progressBar) + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Discovering items to backup") + defer close(progressMessage) bpc := inject.BackupProducerConfig{ LastBackupVersion: lastBackupVersion, @@ -559,8 +559,8 @@ func consumeBackupCollections( "collection_source", "operations", "snapshot_type", "item data") - progressBar := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Backing up data") - defer close(progressBar) + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Backing up data") + defer close(progressMessage) tags := map[string]string{ kopia.TagBackupID: string(backupID), diff --git a/src/internal/operations/export.go b/src/internal/operations/export.go index 3a32f66bd..7207e9635 100644 --- a/src/internal/operations/export.go +++ b/src/internal/operations/export.go @@ -249,16 +249,14 @@ func (op *ExportOperation) do( observe.ProgressCfg{}, fmt.Sprintf("Discovered %d items in backup %s to export", len(paths), op.BackupID)) - kopiaComplete := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Enumerating items in repository") - defer close(kopiaComplete) + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Enumerating items in repository") + defer close(progressMessage) dcs, err := op.kopia.ProduceRestoreCollections(ctx, bup.SnapshotID, paths, opStats.bytesRead, op.Errors) if err != nil { return nil, clues.Wrap(err, "producing collections to export") } - kopiaComplete <- struct{}{} - ctx = clues.Add(ctx, "coll_count", len(dcs)) // should always be 1, since backups are 1:1 with resourceOwners. @@ -340,11 +338,8 @@ func produceExportCollections( exportStats *metrics.ExportStats, errs *fault.Bus, ) ([]export.Collectioner, error) { - complete := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Preparing export") - defer func() { - complete <- struct{}{} - close(complete) - }() + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Preparing export") + defer close(progressMessage) ctx, end := diagnostics.Span(ctx, "m365:export") defer end() diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 48551be78..b9a0d3878 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -282,8 +282,8 @@ func (op *RestoreOperation) do( observe.ProgressCfg{}, fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID)) - progressBar := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Enumerating items in repository") - defer close(progressBar) + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Enumerating items in repository") + defer close(progressMessage) dcs, err := op.kopia.ProduceRestoreCollections( ctx, @@ -394,8 +394,8 @@ func consumeRestoreCollections( return nil, nil, clues.New("no data collections to restore") } - progressBar := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Restoring data") - defer close(progressBar) + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Restoring data") + defer close(progressMessage) ctx, end := diagnostics.Span(ctx, "operations:restore") defer end() diff --git a/src/pkg/export/consume.go b/src/pkg/export/consume.go index 19e27baf5..c4c5711e3 100644 --- a/src/pkg/export/consume.go +++ b/src/pkg/export/consume.go @@ -50,14 +50,14 @@ func writeItem(ctx context.Context, item Item, folder string) error { name := item.Name fpath := filepath.Join(folder, name) - progReader, pclose := observe.ItemSpinner( + progReader := observe.ItemSpinner( ctx, item.Body, observe.ItemExportMsg, clues.Hide(name)) defer item.Body.Close() - defer pclose() + defer progReader.Close() err := os.MkdirAll(folder, os.ModePerm) if err != nil { diff --git a/src/pkg/repository/data_providers.go b/src/pkg/repository/data_providers.go index 81b6e9523..09a4fe900 100644 --- a/src/pkg/repository/data_providers.go +++ b/src/pkg/repository/data_providers.go @@ -90,8 +90,8 @@ func connectToM365( return ctrl, nil } - progressBar := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Connecting to M365") - defer close(progressBar) + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Connecting to M365") + defer close(progressMessage) ctrl, err := m365.NewController( ctx, diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index f69084ec3..08aa1e327 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -191,8 +191,8 @@ func (r *repository) Connect(ctx context.Context, cfg ConnConfig) (err error) { return clues.Stack(err) } - progressBar := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Connecting to repository") - defer close(progressBar) + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Connecting to repository") + defer close(progressMessage) if err := r.setupKopia(ctx, ctrlRepo.Retention{}, false); err != nil { return clues.Stack(err) @@ -214,8 +214,8 @@ func (r *repository) UpdatePassword(ctx context.Context, password string) (err e } }() - progressBar := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Connecting to repository") - defer close(progressBar) + progressMessage := observe.MessageWithCompletion(ctx, observe.DefaultCfg(), "Connecting to repository") + defer close(progressMessage) repoNameHash, err := r.GenerateHashForRepositoryConfigFileName() if err != nil {