Minor refactor of status update code (#4912)

<!-- PR description-->

I missed this change in https://github.com/alcionai/corso/pull/4906. Moving out the status updater code out of `prefetchCollection` scope so that `lazyFetchCollection` can also use it in upcoming PRs.


---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-12-22 17:15:48 -08:00 committed by GitHub
parent ddf29e98dd
commit 1940d3a1f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -28,6 +28,33 @@ const (
numberOfRetries = 4 numberOfRetries = 4
) )
// updateStatus is a utility function used to send the status update through
// the channel.
func updateStatus(
ctx context.Context,
statusUpdater support.StatusUpdater,
attempted int,
streamedItems int64,
totalBytes int64,
folderPath string,
err error,
) {
status := support.CreateStatus(
ctx,
support.Backup,
1,
support.CollectionMetrics{
Objects: attempted,
Successes: int(streamedItems),
Bytes: totalBytes,
},
folderPath)
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
statusUpdater(status)
}
type prefetchCollection[C graph.GetIDer, I groupsItemer] struct { type prefetchCollection[C graph.GetIDer, I groupsItemer] struct {
data.BaseCollection data.BaseCollection
protectedResource string protectedResource string
@ -97,10 +124,19 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
ctx = clues.Add(ctx, "category", col.Category().String()) ctx = clues.Add(ctx, "category", col.Category().String())
defer func() { defer func() {
close(col.stream)
logger.Ctx(ctx).Infow( logger.Ctx(ctx).Infow(
"finished stream backup collection items", "finished stream backup collection items",
"stats", col.Counter.Values()) "stats", col.Counter.Values())
col.finishPopulation(ctx, streamedItems, totalBytes, errs.Failure())
updateStatus(
ctx,
col.statusUpdater,
len(col.added)+len(col.removed),
streamedItems,
totalBytes,
col.FullPath().Folder(false),
errs.Failure())
}() }()
if len(col.added)+len(col.removed) > 0 { if len(col.added)+len(col.removed) > 0 {
@ -212,29 +248,3 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
wg.Wait() wg.Wait()
} }
// finishPopulation is a utility function used to close a collection's data channel
// and to send the status update through the channel.
func (col *prefetchCollection[C, I]) finishPopulation(
ctx context.Context,
streamedItems, totalBytes int64,
err error,
) {
close(col.stream)
attempted := len(col.added) + len(col.removed)
status := support.CreateStatus(
ctx,
support.Backup,
1,
support.CollectionMetrics{
Objects: attempted,
Successes: int(streamedItems),
Bytes: totalBytes,
},
col.FullPath().Folder(false))
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
col.statusUpdater(status)
}