From 5c4d57b416fabf007fc03632bc72e296e1015e06 Mon Sep 17 00:00:00 2001 From: Abin Simon Date: Fri, 2 Jun 2023 10:26:11 +0530 Subject: [PATCH] Parallelize restores within a collection for OneDrive (#3492) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This should massively speed up when restoring a collection with many items. Will not impact much if we have a lot of collections with few items each. Numbers 🔢 : - Restoring ~7000 files, mostly small, totaling 1.5GB - Sequential: ~70m - Parallel: ~50m - Restoring 1200 50mb files - Sequential: 4h 45m - Parallel: <40m --- #### Does this PR need a docs update or release note? - [x] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [ ] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * https://github.com/alcionai/corso/issues/3011 * closes https://github.com/alcionai/corso/issues/3536 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [x] :green_heart: E2E --- CHANGELOG.md | 1 + src/internal/connector/data_collections.go | 47 +++- .../connector/exchange/data_collections.go | 3 +- .../exchange/exchange_data_collection.go | 5 +- .../connector/exchange/service_restore.go | 3 +- .../connector/graph/concurrency_middleware.go | 2 + src/internal/connector/graph/consts.go | 25 +- src/internal/connector/onedrive/collection.go | 9 +- .../connector/onedrive/collections.go | 3 +- src/internal/connector/onedrive/item.go | 21 -- src/internal/connector/onedrive/item_test.go | 6 +- src/internal/connector/onedrive/restore.go | 225 ++++++++++++------ .../connector/sharepoint/collection.go | 3 +- .../connector/sharepoint/data_collections.go | 3 +- src/internal/connector/sharepoint/restore.go | 3 + src/internal/observe/observe.go | 56 +++-- src/internal/observe/observe_test.go | 43 +--- src/internal/operations/backup.go | 6 +- src/internal/operations/restore.go | 6 +- src/pkg/repository/repository.go | 6 +- 20 files changed, 297 insertions(+), 179 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3f47d973..a877f4d3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added ProtectedResourceName to the backup list json output. ProtectedResourceName holds either a UPN or a WebURL, depending on the resource type. - Rework base selection logic for incremental backups so it's more likely to find a valid base. +- Improve OneDrive restore performance by paralleling item restores ### Fixed - Fix Exchange folder cache population error when parent folder isn't found. diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index c3591a0e6..10f4fab2b 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -3,6 +3,7 @@ package connector import ( "context" "strings" + "sync" "github.com/alcionai/clues" @@ -26,6 +27,13 @@ import ( "github.com/alcionai/corso/src/pkg/selectors" ) +const ( + // copyBufferSize is used for chunked upload + // Microsoft recommends 5-10MB buffers + // https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession?view=graph-rest-1.0#best-practices + copyBufferSize = 5 * 1024 * 1024 +) + // --------------------------------------------------------------------------- // Data Collections // --------------------------------------------------------------------------- @@ -256,13 +264,46 @@ func (gc *GraphConnector) ConsumeRestoreCollections( return nil, clues.Wrap(err, "malformed azure credentials") } + // Buffer pool for uploads + pool := sync.Pool{ + New: func() interface{} { + b := make([]byte, copyBufferSize) + return &b + }, + } + switch sels.Service { case selectors.ServiceExchange: - status, err = exchange.RestoreCollections(ctx, creds, gc.Discovery, gc.Service, dest, dcs, deets, errs) + status, err = exchange.RestoreCollections(ctx, + creds, + gc.Discovery, + gc.Service, + dest, + dcs, + deets, + errs) case selectors.ServiceOneDrive: - status, err = onedrive.RestoreCollections(ctx, creds, backupVersion, gc.Service, dest, opts, dcs, deets, errs) + status, err = onedrive.RestoreCollections(ctx, + creds, + backupVersion, + gc.Service, + dest, + opts, + dcs, + deets, + &pool, + errs) case selectors.ServiceSharePoint: - status, err = sharepoint.RestoreCollections(ctx, backupVersion, creds, gc.Service, dest, opts, dcs, deets, errs) + status, err = sharepoint.RestoreCollections(ctx, + backupVersion, + creds, + gc.Service, + dest, + opts, + dcs, + deets, + &pool, + errs) default: err = clues.Wrap(clues.New(sels.Service.String()), "service not supported") } diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index a179156c6..0ee250b2e 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -268,10 +268,9 @@ func createCollections( return nil, clues.New("unsupported backup category type").WithClues(ctx) } - foldersComplete, closer := observe.MessageWithCompletion( + foldersComplete := observe.MessageWithCompletion( ctx, observe.Bulletf("%s", qp.Category)) - defer closer() defer close(foldersComplete) rootFolder, cc := handler.NewContainerCache(user.ID()) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 83c000b2c..921952b88 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -163,14 +163,11 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { }() if len(col.added)+len(col.removed) > 0 { - var closer func() - colProgress, closer = observe.CollectionProgress( + colProgress = observe.CollectionProgress( ctx, col.fullPath.Category().String(), col.LocationPath().Elements()) - go closer() - defer func() { close(colProgress) }() diff --git a/src/internal/connector/exchange/service_restore.go b/src/internal/connector/exchange/service_restore.go index 4e77ef5cc..d8680d92b 100644 --- a/src/internal/connector/exchange/service_restore.go +++ b/src/internal/connector/exchange/service_restore.go @@ -145,11 +145,10 @@ func restoreCollection( category = fullPath.Category() ) - colProgress, closer := observe.CollectionProgress( + colProgress := observe.CollectionProgress( ctx, category.String(), fullPath.Folder(false)) - defer closer() defer close(colProgress) for { diff --git a/src/internal/connector/graph/concurrency_middleware.go b/src/internal/connector/graph/concurrency_middleware.go index ba2a08fa6..c70988f65 100644 --- a/src/internal/connector/graph/concurrency_middleware.go +++ b/src/internal/connector/graph/concurrency_middleware.go @@ -142,6 +142,8 @@ type limiterConsumptionKey string const limiterConsumptionCtxKey limiterConsumptionKey = "corsoGraphRateLimiterConsumption" const ( + // https://learn.microsoft.com/en-us/sharepoint/dev/general-development + // /how-to-avoid-getting-throttled-or-blocked-in-sharepoint-online#application-throttling defaultLC = 1 driveDefaultLC = 2 // limit consumption rate for single-item GETs requests, diff --git a/src/internal/connector/graph/consts.go b/src/internal/connector/graph/consts.go index 0438ab10c..00785f1fa 100644 --- a/src/internal/connector/graph/consts.go +++ b/src/internal/connector/graph/consts.go @@ -7,7 +7,15 @@ import ( "github.com/alcionai/corso/src/pkg/path" ) -const AttachmentChunkSize = 4 * 1024 * 1024 +const ( + AttachmentChunkSize = 4 * 1024 * 1024 + + // Upper limit on the number of concurrent uploads as we + // create buffer pools for each upload. This is not the actual + // number of uploads, but the max that can be specified. This is + // added as a safeguard in case we misconfigure the values. + maxConccurrentUploads = 20 +) // --------------------------------------------------------------------------- // item response AdditionalData @@ -44,6 +52,8 @@ type parallelism struct { collectionBuffer int // sets the parallelism of item population within a collection. item int + // sets the parallelism of concurrent uploads within a collection + itemUpload int } func (p parallelism) CollectionBufferSize() int { @@ -88,6 +98,18 @@ func (p parallelism) Item() int { return p.item } +func (p parallelism) ItemUpload() int { + if p.itemUpload == 0 { + return 1 + } + + if p.itemUpload > maxConccurrentUploads { + return maxConccurrentUploads + } + + return p.itemUpload +} + // returns low <= v <= high // if high < low, returns low <= v func isWithin(low, high, v int) bool { @@ -102,6 +124,7 @@ var sp = map[path.ServiceType]parallelism{ path.OneDriveService: { collectionBuffer: 5, item: 4, + itemUpload: 7, }, // sharepoint libraries are considered "onedrive" parallelism. // this only controls lists/pages. diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 6f7cf3da8..f4f3807fb 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -439,12 +439,11 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { queuedPath = "/" + oc.driveName + queuedPath } - folderProgress, colCloser := observe.ProgressWithCount( + folderProgress := observe.ProgressWithCount( ctx, observe.ItemQueueMsg, path.NewElements(queuedPath), int64(len(oc.driveItems))) - defer colCloser() defer close(folderProgress) semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).Item()) @@ -535,13 +534,12 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { } // display/log the item download - progReader, closer := observe.ItemProgress( + progReader, _ := observe.ItemProgress( ctx, itemData, observe.ItemBackupMsg, clues.Hide(itemName+dataSuffix), itemSize) - go closer() return progReader, nil }) @@ -554,13 +552,12 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { } metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - progReader, closer := observe.ItemProgress( + progReader, _ := observe.ItemProgress( ctx, itemMeta, observe.ItemBackupMsg, clues.Hide(itemName+metaSuffix), int64(itemMetaSize)) - go closer() return progReader, nil }) diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index aabee1bcf..71e665053 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -283,8 +283,7 @@ func (c *Collections) Get( driveTombstones[driveID] = struct{}{} } - driveComplete, closer := observe.MessageWithCompletion(ctx, observe.Bulletf("files")) - defer closer() + driveComplete := observe.MessageWithCompletion(ctx, observe.Bulletf("files")) defer close(driveComplete) // Enumerate drives for the specified resourceOwner diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index ac05887cc..97578589f 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -346,27 +346,6 @@ func sharePointItemInfo(di models.DriveItemable, itemSize int64) *details.ShareP } } -// driveItemWriter is used to initialize and return an io.Writer to upload data for the specified item -// It does so by creating an upload session and using that URL to initialize an `itemWriter` -// TODO: @vkamra verify if var session is the desired input -func driveItemWriter( - ctx context.Context, - gs graph.Servicer, - driveID, itemID string, - itemSize int64, -) (io.Writer, error) { - ctx = clues.Add(ctx, "upload_item_id", itemID) - - r, err := api.PostDriveItem(ctx, gs, driveID, itemID) - if err != nil { - return nil, clues.Stack(err) - } - - iw := graph.NewLargeItemWriter(itemID, ptr.Val(r.GetUploadUrl()), itemSize) - - return iw, nil -} - // constructWebURL helper function for recreating the webURL // for the originating SharePoint site. Uses additional data map // from a models.DriveItemable that possesses a downloadURL within the map. diff --git a/src/internal/connector/onedrive/item_test.go b/src/internal/connector/onedrive/item_test.go index a30037b43..5ad4f537a 100644 --- a/src/internal/connector/onedrive/item_test.go +++ b/src/internal/connector/onedrive/item_test.go @@ -189,9 +189,13 @@ func (suite *ItemIntegrationSuite) TestItemWriter() { // Initialize a 100KB mockDataProvider td, writeSize := mockDataReader(int64(100 * 1024)) - w, err := driveItemWriter(ctx, srv, test.driveID, ptr.Val(newItem.GetId()), writeSize) + itemID := ptr.Val(newItem.GetId()) + + r, err := api.PostDriveItem(ctx, srv, test.driveID, itemID) require.NoError(t, err, clues.ToCore(err)) + w := graph.NewLargeItemWriter(itemID, ptr.Val(r.GetUploadUrl()), writeSize) + // Using a 32 KB buffer for the copy allows us to validate the // multi-part upload. `io.CopyBuffer` will only write 32 KB at // a time diff --git a/src/internal/connector/onedrive/restore.go b/src/internal/connector/onedrive/restore.go index 755496420..124668bb1 100644 --- a/src/internal/connector/onedrive/restore.go +++ b/src/internal/connector/onedrive/restore.go @@ -3,10 +3,13 @@ package onedrive import ( "context" "encoding/json" + "fmt" "io" "runtime/trace" "sort" "strings" + "sync" + "sync/atomic" "github.com/alcionai/clues" "github.com/pkg/errors" @@ -28,10 +31,10 @@ import ( "github.com/alcionai/corso/src/pkg/services/m365/api" ) -// copyBufferSize is used for chunked upload -// Microsoft recommends 5-10MB buffers -// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession?view=graph-rest-1.0#best-practices -const copyBufferSize = 5 * 1024 * 1024 +const ( + // Maximum number of retries for upload failures + maxUploadRetries = 3 +) type restoreCaches struct { Folders *folderCache @@ -59,6 +62,7 @@ func RestoreCollections( opts control.Options, dcs []data.RestoreCollection, deets *details.Builder, + pool *sync.Pool, errs *fault.Bus, ) (*support.ConnectorOperationStatus, error) { var ( @@ -104,6 +108,7 @@ func RestoreCollections( dest.ContainerName, deets, opts.RestorePermissions, + pool, errs) if err != nil { el.AddRecoverable(err) @@ -142,13 +147,18 @@ func RestoreCollection( restoreContainerName string, deets *details.Builder, restorePerms bool, + pool *sync.Pool, errs *fault.Bus, ) (support.CollectionMetrics, error) { var ( - metrics = support.CollectionMetrics{} - copyBuffer = make([]byte, copyBufferSize) - directory = dc.FullPath() - el = errs.Local() + metrics = support.CollectionMetrics{} + directory = dc.FullPath() + el = errs.Local() + metricsObjects int64 + metricsBytes int64 + metricsSuccess int64 + wg sync.WaitGroup + complete bool ) ctx, end := diagnostics.Span(ctx, "gc:drive:restoreCollection", diagnostics.Label("path", directory)) @@ -212,8 +222,30 @@ func RestoreCollection( caches.ParentDirToMeta[dc.FullPath().String()] = colMeta items := dc.Items(ctx, errs) + semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).ItemUpload()) + defer close(semaphoreCh) + + deetsLock := sync.Mutex{} + + updateDeets := func( + ctx context.Context, + repoRef path.Path, + locationRef *path.Builder, + updated bool, + info details.ItemInfo, + ) { + deetsLock.Lock() + defer deetsLock.Unlock() + + err = deets.Add(repoRef, locationRef, updated, info) + if err != nil { + // Not critical enough to need to stop restore operation. + logger.CtxErr(ctx, err).Infow("adding restored item to details") + } + } + for { - if el.Failure() != nil { + if el.Failure() != nil || complete { break } @@ -223,62 +255,76 @@ func RestoreCollection( case itemData, ok := <-items: if !ok { - return metrics, nil + // We've processed all items in this collection, exit the loop + complete = true + break } - ictx := clues.Add(ctx, "restore_item_id", itemData.UUID()) + wg.Add(1) + semaphoreCh <- struct{}{} - itemPath, err := dc.FullPath().AppendItem(itemData.UUID()) - if err != nil { - el.AddRecoverable(clues.Wrap(err, "appending item to full path").WithClues(ictx)) - continue - } + go func(ctx context.Context, itemData data.Stream) { + defer wg.Done() + defer func() { <-semaphoreCh }() - itemInfo, skipped, err := restoreItem( - ictx, - creds, - dc, - backupVersion, - source, - service, - drivePath, - restoreFolderID, - copyBuffer, - caches, - restorePerms, - itemData, - itemPath) + copyBufferPtr := pool.Get().(*[]byte) + defer pool.Put(copyBufferPtr) - // skipped items don't get counted, but they can error - if !skipped { - metrics.Objects++ - metrics.Bytes += int64(len(copyBuffer)) - } + copyBuffer := *copyBufferPtr - if err != nil { - el.AddRecoverable(clues.Wrap(err, "restoring item")) - continue - } + ictx := clues.Add(ctx, "restore_item_id", itemData.UUID()) - if skipped { - logger.Ctx(ictx).With("item_path", itemPath).Debug("did not restore item") - continue - } + itemPath, err := dc.FullPath().AppendItem(itemData.UUID()) + if err != nil { + el.AddRecoverable(clues.Wrap(err, "appending item to full path").WithClues(ictx)) + return + } - err = deets.Add( - itemPath, - &path.Builder{}, // TODO: implement locationRef - true, - itemInfo) - if err != nil { - // Not critical enough to need to stop restore operation. - logger.CtxErr(ictx, err).Infow("adding restored item to details") - } + itemInfo, skipped, err := restoreItem( + ictx, + creds, + dc, + backupVersion, + source, + service, + drivePath, + restoreFolderID, + copyBuffer, + caches, + restorePerms, + itemData, + itemPath) - metrics.Successes++ + // skipped items don't get counted, but they can error + if !skipped { + atomic.AddInt64(&metricsObjects, 1) + atomic.AddInt64(&metricsBytes, int64(len(copyBuffer))) + } + + if err != nil { + el.AddRecoverable(clues.Wrap(err, "restoring item")) + return + } + + if skipped { + logger.Ctx(ictx).With("item_path", itemPath).Debug("did not restore item") + return + } + + // TODO: implement locationRef + updateDeets(ictx, itemPath, &path.Builder{}, true, itemInfo) + + atomic.AddInt64(&metricsSuccess, 1) + }(ctx, itemData) } } + wg.Wait() + + metrics.Objects = int(metricsObjects) + metrics.Bytes = metricsBytes + metrics.Successes = int(metricsSuccess) + return metrics, el.Failure() } @@ -308,6 +354,7 @@ func restoreItem( source, service, drivePath, + dc, restoreFolderID, copyBuffer, itemData) @@ -399,6 +446,7 @@ func restoreV0File( source driveSource, service graph.Servicer, drivePath *path.DrivePath, + fetcher fileFetcher, restoreFolderID string, copyBuffer []byte, itemData data.Stream, @@ -406,6 +454,7 @@ func restoreV0File( _, itemInfo, err := restoreData( ctx, service, + fetcher, itemData.UUID(), itemData, drivePath.DriveID, @@ -442,6 +491,7 @@ func restoreV1File( itemID, itemInfo, err := restoreData( ctx, service, + fetcher, trimmedName, itemData, drivePath.DriveID, @@ -525,6 +575,7 @@ func restoreV6File( itemID, itemInfo, err := restoreData( ctx, service, + fetcher, meta.FileName, itemData, drivePath.DriveID, @@ -673,6 +724,7 @@ func createRestoreFolders( func restoreData( ctx context.Context, service graph.Servicer, + fetcher fileFetcher, name string, itemData data.Stream, driveID, parentFolderID string, @@ -696,26 +748,65 @@ func restoreData( return "", details.ItemInfo{}, err } - // Get a drive item writer - w, err := driveItemWriter(ctx, service, driveID, ptr.Val(newItem.GetId()), ss.Size()) + itemID := ptr.Val(newItem.GetId()) + ctx = clues.Add(ctx, "upload_item_id", itemID) + + r, err := api.PostDriveItem(ctx, service, driveID, itemID) if err != nil { - return "", details.ItemInfo{}, err + return "", details.ItemInfo{}, clues.Wrap(err, "get upload session") } - iReader := itemData.ToReader() - progReader, closer := observe.ItemProgress( - ctx, - iReader, - observe.ItemRestoreMsg, - clues.Hide(name), - ss.Size()) + var written int64 - go closer() + // This is just to retry file upload, the uploadSession creation is + // not retried here We need extra logic to retry file upload as we + // have to pull the file again from kopia If we fail a file upload, + // we restart from scratch and try to upload again. Graph does not + // show "register" any partial file uploads and so if we fail an + // upload the file size will be 0. + for i := 0; i <= maxUploadRetries; i++ { + // Initialize and return an io.Writer to upload data for the + // specified item It does so by creating an upload session and + // using that URL to initialize an `itemWriter` + // TODO: @vkamra verify if var session is the desired input + w := graph.NewLargeItemWriter(itemID, ptr.Val(r.GetUploadUrl()), ss.Size()) + + pname := name + iReader := itemData.ToReader() + + if i > 0 { + pname = fmt.Sprintf("%s (retry %d)", name, i) + + // If it is not the first try, we have to pull the file + // again from kopia. Ideally we could just seek the stream + // but we don't have a Seeker available here. + itemData, err := fetcher.Fetch(ctx, itemData.UUID()) + if err != nil { + return "", details.ItemInfo{}, clues.Wrap(err, "get data file") + } + + iReader = itemData.ToReader() + } + + progReader, abort := observe.ItemProgress( + ctx, + iReader, + observe.ItemRestoreMsg, + clues.Hide(pname), + ss.Size()) + + // Upload the stream data + written, err = io.CopyBuffer(w, progReader, copyBuffer) + if err == nil { + break + } + + // clear out the bar if err + abort() + } - // Upload the stream data - written, err := io.CopyBuffer(w, progReader, copyBuffer) if err != nil { - return "", details.ItemInfo{}, graph.Wrap(ctx, err, "writing item bytes") + return "", details.ItemInfo{}, clues.Wrap(err, "uploading file") } dii := details.ItemInfo{} diff --git a/src/internal/connector/sharepoint/collection.go b/src/internal/connector/sharepoint/collection.go index 87bd268d7..da63e9b89 100644 --- a/src/internal/connector/sharepoint/collection.go +++ b/src/internal/connector/sharepoint/collection.go @@ -183,11 +183,10 @@ func (sc *Collection) runPopulate(ctx context.Context, errs *fault.Bus) (support ) // TODO: Insert correct ID for CollectionProgress - colProgress, closer := observe.CollectionProgress( + colProgress := observe.CollectionProgress( ctx, sc.fullPath.Category().String(), sc.fullPath.Folders()) - go closer() defer func() { close(colProgress) diff --git a/src/internal/connector/sharepoint/data_collections.go b/src/internal/connector/sharepoint/data_collections.go index e23de9493..f7f0f2481 100644 --- a/src/internal/connector/sharepoint/data_collections.go +++ b/src/internal/connector/sharepoint/data_collections.go @@ -61,10 +61,9 @@ func DataCollections( break } - foldersComplete, closer := observe.MessageWithCompletion( + foldersComplete := observe.MessageWithCompletion( ctx, observe.Bulletf("%s", scope.Category().PathType())) - defer closer() defer close(foldersComplete) var spcs []data.BackupCollection diff --git a/src/internal/connector/sharepoint/restore.go b/src/internal/connector/sharepoint/restore.go index f889a25b9..2f15152a8 100644 --- a/src/internal/connector/sharepoint/restore.go +++ b/src/internal/connector/sharepoint/restore.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "runtime/trace" + "sync" "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -48,6 +49,7 @@ func RestoreCollections( opts control.Options, dcs []data.RestoreCollection, deets *details.Builder, + pool *sync.Pool, errs *fault.Bus, ) (*support.ConnectorOperationStatus, error) { var ( @@ -90,6 +92,7 @@ func RestoreCollections( dest.ContainerName, deets, opts.RestorePermissions, + pool, errs) case path.ListsCategory: diff --git a/src/internal/observe/observe.go b/src/internal/observe/observe.go index ea822f431..bd94b6531 100644 --- a/src/internal/observe/observe.go +++ b/src/internal/observe/observe.go @@ -180,7 +180,7 @@ func Message(ctx context.Context, msgs ...any) { func MessageWithCompletion( ctx context.Context, msg any, -) (chan<- struct{}, func()) { +) chan<- struct{} { var ( plain = plainString(msg) loggable = fmt.Sprintf("%v", msg) @@ -191,7 +191,8 @@ func MessageWithCompletion( log.Info(loggable) if cfg.hidden() { - return ch, func() { log.Info("done - " + loggable) } + defer log.Info("done - " + loggable) + return ch } wg.Add(1) @@ -219,11 +220,11 @@ func MessageWithCompletion( bar.SetTotal(-1, true) }) - wacb := waitAndCloseBar(bar, func() { + go waitAndCloseBar(bar, func() { log.Info("done - " + loggable) - }) + })() - return ch, wacb + return ch } // --------------------------------------------------------------------------- @@ -247,7 +248,8 @@ func ItemProgress( log.Debug(header) if cfg.hidden() || rc == nil || totalBytes == 0 { - return rc, func() { log.Debug("done - " + header) } + defer log.Debug("done - " + header) + return rc, func() {} } wg.Add(1) @@ -266,12 +268,17 @@ func ItemProgress( bar := progress.New(totalBytes, mpb.NopStyle(), barOpts...) - wacb := waitAndCloseBar(bar, func() { + go waitAndCloseBar(bar, func() { // might be overly chatty, we can remove if needed. log.Debug("done - " + header) - }) + })() - return bar.ProxyReader(rc), wacb + abort := func() { + bar.SetTotal(-1, true) + bar.Abort(true) + } + + return bar.ProxyReader(rc), abort } // ProgressWithCount tracks the display of a bar that tracks the completion @@ -283,7 +290,7 @@ func ProgressWithCount( header string, msg any, count int64, -) (chan<- struct{}, func()) { +) chan<- struct{} { var ( plain = plainString(msg) loggable = fmt.Sprintf("%s %v - %d", header, msg, count) @@ -295,7 +302,10 @@ func ProgressWithCount( if cfg.hidden() { go listen(ctx, ch, nop, nop) - return ch, func() { log.Info("done - " + loggable) } + + defer log.Info("done - " + loggable) + + return ch } wg.Add(1) @@ -319,11 +329,11 @@ func ProgressWithCount( func() { bar.Abort(true) }, bar.Increment) - wacb := waitAndCloseBar(bar, func() { + go waitAndCloseBar(bar, func() { log.Info("done - " + loggable) - }) + })() - return ch, wacb + return ch } // --------------------------------------------------------------------------- @@ -365,7 +375,7 @@ func CollectionProgress( ctx context.Context, category string, dirName any, -) (chan<- struct{}, func()) { +) chan<- struct{} { var ( counted int plain = plainString(dirName) @@ -388,7 +398,10 @@ func CollectionProgress( if cfg.hidden() || len(plain) == 0 { go listen(ctx, ch, nop, incCount) - return ch, func() { log.Infow("done - "+message, "count", counted) } + + defer log.Infow("done - "+message, "count", counted) + + return ch } wg.Add(1) @@ -420,18 +433,21 @@ func CollectionProgress( bar.Increment() }) - wacb := waitAndCloseBar(bar, func() { + go waitAndCloseBar(bar, func() { log.Infow("done - "+message, "count", counted) - }) + })() - return ch, wacb + return ch } func waitAndCloseBar(bar *mpb.Bar, log func()) func() { return func() { bar.Wait() wg.Done() - log() + + if !bar.Aborted() { + log() + } } } diff --git a/src/internal/observe/observe_test.go b/src/internal/observe/observe_test.go index 4fc36a912..08365462a 100644 --- a/src/internal/observe/observe_test.go +++ b/src/internal/observe/observe_test.go @@ -51,16 +51,14 @@ func (suite *ObserveProgressUnitSuite) TestItemProgress() { }() from := make([]byte, 100) - prog, closer := ItemProgress( + prog, abort := ItemProgress( ctx, io.NopCloser(bytes.NewReader(from)), "folder", tst, 100) require.NotNil(t, prog) - require.NotNil(t, closer) - - defer closer() + require.NotNil(t, abort) var i int @@ -105,9 +103,8 @@ func (suite *ObserveProgressUnitSuite) TestCollectionProgress_unblockOnCtxCancel SeedWriter(context.Background(), nil, nil) }() - progCh, closer := CollectionProgress(ctx, testcat, testertons) + progCh := CollectionProgress(ctx, testcat, testertons) require.NotNil(t, progCh) - require.NotNil(t, closer) defer close(progCh) @@ -119,9 +116,6 @@ func (suite *ObserveProgressUnitSuite) TestCollectionProgress_unblockOnCtxCancel time.Sleep(1 * time.Second) cancel() }() - - // blocks, but should resolve due to the ctx cancel - closer() } func (suite *ObserveProgressUnitSuite) TestCollectionProgress_unblockOnChannelClose() { @@ -140,9 +134,8 @@ func (suite *ObserveProgressUnitSuite) TestCollectionProgress_unblockOnChannelCl SeedWriter(context.Background(), nil, nil) }() - progCh, closer := CollectionProgress(ctx, testcat, testertons) + progCh := CollectionProgress(ctx, testcat, testertons) require.NotNil(t, progCh) - require.NotNil(t, closer) for i := 0; i < 50; i++ { progCh <- struct{}{} @@ -152,9 +145,6 @@ func (suite *ObserveProgressUnitSuite) TestCollectionProgress_unblockOnChannelCl time.Sleep(1 * time.Second) close(progCh) }() - - // blocks, but should resolve due to the cancel - closer() } func (suite *ObserveProgressUnitSuite) TestObserveProgress() { @@ -197,14 +187,11 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithCompletion() { message := "Test Message" - ch, closer := MessageWithCompletion(ctx, message) + ch := MessageWithCompletion(ctx, message) // Trigger completion ch <- struct{}{} - // Run the closer - this should complete because the bar was compelted above - closer() - Complete() require.NotEmpty(t, recorder.String()) @@ -229,14 +216,11 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithChannelClosed() { message := "Test Message" - ch, closer := MessageWithCompletion(ctx, message) + ch := MessageWithCompletion(ctx, message) // Close channel without completing close(ch) - // Run the closer - this should complete because the channel was closed above - closer() - Complete() require.NotEmpty(t, recorder.String()) @@ -263,14 +247,11 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithContextCancelled() message := "Test Message" - _, closer := MessageWithCompletion(ctx, message) + _ = MessageWithCompletion(ctx, message) // cancel context cancel() - // Run the closer - this should complete because the context was closed above - closer() - Complete() require.NotEmpty(t, recorder.String()) @@ -296,15 +277,12 @@ func (suite *ObserveProgressUnitSuite) TestObserveProgressWithCount() { message := "Test Message" count := 3 - ch, closer := ProgressWithCount(ctx, header, message, int64(count)) + ch := ProgressWithCount(ctx, header, message, int64(count)) for i := 0; i < count; i++ { ch <- struct{}{} } - // Run the closer - this should complete because the context was closed above - closer() - Complete() require.NotEmpty(t, recorder.String()) @@ -331,13 +309,10 @@ func (suite *ObserveProgressUnitSuite) TestrogressWithCountChannelClosed() { message := "Test Message" count := 3 - ch, closer := ProgressWithCount(ctx, header, message, int64(count)) + ch := ProgressWithCount(ctx, header, message, int64(count)) close(ch) - // Run the closer - this should complete because the context was closed above - closer() - Complete() require.NotEmpty(t, recorder.String()) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 5ad400bb3..db7a007fa 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -407,11 +407,10 @@ func produceBackupDataCollections( ctrlOpts control.Options, errs *fault.Bus, ) ([]data.BackupCollection, prefixmatcher.StringSetReader, error) { - complete, closer := observe.MessageWithCompletion(ctx, "Discovering items to backup") + complete := observe.MessageWithCompletion(ctx, "Discovering items to backup") defer func() { complete <- struct{}{} close(complete) - closer() }() return bp.ProduceBackupCollections( @@ -490,11 +489,10 @@ func consumeBackupCollections( isIncremental bool, errs *fault.Bus, ) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) { - complete, closer := observe.MessageWithCompletion(ctx, "Backing up data") + complete := observe.MessageWithCompletion(ctx, "Backing up data") defer func() { complete <- struct{}{} close(complete) - closer() }() tags := map[string]string{ diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 5b2c9bd71..ab0b5a4c8 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -236,8 +236,7 @@ func (op *RestoreOperation) do( observe.Message(ctx, fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID)) - kopiaComplete, closer := observe.MessageWithCompletion(ctx, "Enumerating items in repository") - defer closer() + kopiaComplete := observe.MessageWithCompletion(ctx, "Enumerating items in repository") defer close(kopiaComplete) dcs, err := op.kopia.ProduceRestoreCollections(ctx, bup.SnapshotID, paths, opStats.bytesRead, op.Errors) @@ -322,11 +321,10 @@ func consumeRestoreCollections( dcs []data.RestoreCollection, errs *fault.Bus, ) (*details.Details, error) { - complete, closer := observe.MessageWithCompletion(ctx, "Restoring data") + complete := observe.MessageWithCompletion(ctx, "Restoring data") defer func() { complete <- struct{}{} close(complete) - closer() }() deets, err := rc.ConsumeRestoreCollections( diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 7f37f4eaf..fbe374223 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -203,8 +203,7 @@ func Connect( // their output getting clobbered (#1720) defer observe.Complete() - complete, closer := observe.MessageWithCompletion(ctx, "Connecting to repository") - defer closer() + complete := observe.MessageWithCompletion(ctx, "Connecting to repository") defer close(complete) kopiaRef := kopia.NewConn(s) @@ -630,11 +629,10 @@ func connectToM365( sel selectors.Selector, acct account.Account, ) (*connector.GraphConnector, error) { - complete, closer := observe.MessageWithCompletion(ctx, "Connecting to M365") + complete := observe.MessageWithCompletion(ctx, "Connecting to M365") defer func() { complete <- struct{}{} close(complete) - closer() }() // retrieve data from the producer