Refactor populateItems (#3732)

<!-- PR description-->

Benefits:
* Narrowed scope & better testability. 
* Preps it for follow up PRs which will inject per-item corso extension interfaces
* Not adding unit tests yet, as the args & outputs of this new function will change in a follow up PR.
---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-07-05 11:52:21 -07:00 committed by GitHub
parent b0d3d4f3ee
commit 6fff3f7d1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -403,17 +403,21 @@ func readItemContents(
return rc, nil return rc, nil
} }
type driveStats struct {
dirsRead int64
itemsRead int64
byteCount int64
itemsFound int64
dirsFound int64
}
// populateItems iterates through items added to the collection // populateItems iterates through items added to the collection
// and uses the collection `itemReader` to read the item // and uses the collection `itemReader` to read the item
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
var ( var (
byteCount int64 el = errs.Local()
itemsRead int64 stats driveStats
dirsRead int64 wg sync.WaitGroup
itemsFound int64
dirsFound int64
wg sync.WaitGroup
el = errs.Local()
) )
// Retrieve the OneDrive folder path to set later in // Retrieve the OneDrive folder path to set later in
@ -445,122 +449,137 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
wg.Add(1) wg.Add(1)
go func(ctx context.Context, item models.DriveItemable) { go func(item models.DriveItemable) {
defer wg.Done() defer wg.Done()
defer func() { <-semaphoreCh }() defer func() { <-semaphoreCh }()
// Read the item // Read the item
var ( oc.populateDriveItem(
itemID = ptr.Val(item.GetId())
itemName = ptr.Val(item.GetName())
itemSize = ptr.Val(item.GetSize())
itemInfo details.ItemInfo
itemMeta io.ReadCloser
itemMetaSize int
metaFileName string
metaSuffix string
err error
)
ctx = clues.Add(
ctx, ctx,
"item_id", itemID, parentPath,
"item_name", clues.Hide(itemName), item,
"item_size", itemSize) &stats,
errs)
item.SetParentReference(setName(item.GetParentReference(), oc.driveName))
isFile := item.GetFile() != nil
if isFile {
atomic.AddInt64(&itemsFound, 1)
metaFileName = itemID
metaSuffix = metadata.MetaFileSuffix
} else {
atomic.AddInt64(&dirsFound, 1)
// metaFileName not set for directories so we get just ".dirmeta"
metaSuffix = metadata.DirMetaFileSuffix
}
// Fetch metadata for the file
itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item)
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
return
}
itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath)
ctx = clues.Add(ctx, "item_info", itemInfo)
if isFile {
dataSuffix := metadata.DataFileSuffix
// Construct a new lazy readCloser to feed to the collection consumer.
// This ensures that downloads won't be attempted unless that consumer
// attempts to read bytes. Assumption is that kopia will check things
// like file modtimes before attempting to read.
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
if err != nil {
return nil, err
}
// display/log the item download
progReader, _ := observe.ItemProgress(
ctx,
itemData,
observe.ItemBackupMsg,
clues.Hide(itemName+dataSuffix),
itemSize)
return progReader, nil
})
oc.data <- &Item{
id: itemID + dataSuffix,
data: itemReader,
info: itemInfo,
}
}
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
progReader, _ := observe.ItemProgress(
ctx,
itemMeta,
observe.ItemBackupMsg,
clues.Hide(itemName+metaSuffix),
int64(itemMetaSize))
return progReader, nil
})
oc.data <- &metadata.Item{
ID: metaFileName + metaSuffix,
Data: metaReader,
// Metadata file should always use the latest time as
// permissions change does not update mod time.
Mod: time.Now(),
}
// Item read successfully, add to collection
if isFile {
atomic.AddInt64(&itemsRead, 1)
} else {
atomic.AddInt64(&dirsRead, 1)
}
// byteCount iteration
atomic.AddInt64(&byteCount, itemSize)
folderProgress <- struct{}{} folderProgress <- struct{}{}
}(ctx, item) }(item)
} }
wg.Wait() wg.Wait()
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount) oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount)
}
func (oc *Collection) populateDriveItem(
ctx context.Context,
parentPath *path.Builder,
item models.DriveItemable,
stats *driveStats,
errs *fault.Bus,
) {
var (
el = errs.Local()
itemID = ptr.Val(item.GetId())
itemName = ptr.Val(item.GetName())
itemSize = ptr.Val(item.GetSize())
itemInfo details.ItemInfo
itemMeta io.ReadCloser
itemMetaSize int
metaFileName string
metaSuffix string
err error
)
ctx = clues.Add(
ctx,
"item_id", itemID,
"item_name", clues.Hide(itemName),
"item_size", itemSize)
item.SetParentReference(setName(item.GetParentReference(), oc.driveName))
isFile := item.GetFile() != nil
if isFile {
atomic.AddInt64(&stats.itemsFound, 1)
metaFileName = itemID
metaSuffix = metadata.MetaFileSuffix
} else {
atomic.AddInt64(&stats.dirsFound, 1)
// metaFileName not set for directories so we get just ".dirmeta"
metaSuffix = metadata.DirMetaFileSuffix
}
// Fetch metadata for the file
itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item)
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
return
}
itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath)
ctx = clues.Add(ctx, "item_info", itemInfo)
if isFile {
dataSuffix := metadata.DataFileSuffix
// Construct a new lazy readCloser to feed to the collection consumer.
// This ensures that downloads won't be attempted unless that consumer
// attempts to read bytes. Assumption is that kopia will check things
// like file modtimes before attempting to read.
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
if err != nil {
return nil, err
}
// display/log the item download
progReader, _ := observe.ItemProgress(
ctx,
itemData,
observe.ItemBackupMsg,
clues.Hide(itemName+dataSuffix),
itemSize)
return progReader, nil
})
oc.data <- &Item{
id: itemID + dataSuffix,
data: itemReader,
info: itemInfo,
}
}
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
progReader, _ := observe.ItemProgress(
ctx,
itemMeta,
observe.ItemBackupMsg,
clues.Hide(itemName+metaSuffix),
int64(itemMetaSize))
return progReader, nil
})
oc.data <- &metadata.Item{
ID: metaFileName + metaSuffix,
Data: metaReader,
// Metadata file should always use the latest time as
// permissions change does not update mod time.
Mod: time.Now(),
}
// Item read successfully, add to collection
if isFile {
atomic.AddInt64(&stats.itemsRead, 1)
} else {
atomic.AddInt64(&stats.dirsRead, 1)
}
atomic.AddInt64(&stats.byteCount, itemSize)
} }
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64) { func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64) {