diff --git a/src/internal/m365/onedrive/collection.go b/src/internal/m365/onedrive/collection.go index 197bee01f..902aeda13 100644 --- a/src/internal/m365/onedrive/collection.go +++ b/src/internal/m365/onedrive/collection.go @@ -403,17 +403,21 @@ func readItemContents( return rc, nil } +type driveStats struct { + dirsRead int64 + itemsRead int64 + byteCount int64 + itemsFound int64 + dirsFound int64 +} + // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { var ( - byteCount int64 - itemsRead int64 - dirsRead int64 - itemsFound int64 - dirsFound int64 - wg sync.WaitGroup - el = errs.Local() + el = errs.Local() + stats driveStats + wg sync.WaitGroup ) // Retrieve the OneDrive folder path to set later in @@ -445,122 +449,137 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { wg.Add(1) - go func(ctx context.Context, item models.DriveItemable) { + go func(item models.DriveItemable) { defer wg.Done() defer func() { <-semaphoreCh }() // Read the item - var ( - itemID = ptr.Val(item.GetId()) - itemName = ptr.Val(item.GetName()) - itemSize = ptr.Val(item.GetSize()) - itemInfo details.ItemInfo - itemMeta io.ReadCloser - itemMetaSize int - metaFileName string - metaSuffix string - err error - ) - - ctx = clues.Add( + oc.populateDriveItem( ctx, - "item_id", itemID, - "item_name", clues.Hide(itemName), - "item_size", itemSize) - - item.SetParentReference(setName(item.GetParentReference(), oc.driveName)) - - isFile := item.GetFile() != nil - - if isFile { - atomic.AddInt64(&itemsFound, 1) - - metaFileName = itemID - metaSuffix = metadata.MetaFileSuffix - } else { - atomic.AddInt64(&dirsFound, 1) - - // metaFileName not set for directories so we get just ".dirmeta" - metaSuffix = metadata.DirMetaFileSuffix - } - - // Fetch metadata for the file - itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item) - if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation)) - return - } - - itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath) - - ctx = clues.Add(ctx, "item_info", itemInfo) - - if isFile { - dataSuffix := metadata.DataFileSuffix - - // Construct a new lazy readCloser to feed to the collection consumer. - // This ensures that downloads won't be attempted unless that consumer - // attempts to read bytes. Assumption is that kopia will check things - // like file modtimes before attempting to read. - itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs) - if err != nil { - return nil, err - } - - // display/log the item download - progReader, _ := observe.ItemProgress( - ctx, - itemData, - observe.ItemBackupMsg, - clues.Hide(itemName+dataSuffix), - itemSize) - - return progReader, nil - }) - - oc.data <- &Item{ - id: itemID + dataSuffix, - data: itemReader, - info: itemInfo, - } - } - - metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - progReader, _ := observe.ItemProgress( - ctx, - itemMeta, - observe.ItemBackupMsg, - clues.Hide(itemName+metaSuffix), - int64(itemMetaSize)) - return progReader, nil - }) - - oc.data <- &metadata.Item{ - ID: metaFileName + metaSuffix, - Data: metaReader, - // Metadata file should always use the latest time as - // permissions change does not update mod time. - Mod: time.Now(), - } - - // Item read successfully, add to collection - if isFile { - atomic.AddInt64(&itemsRead, 1) - } else { - atomic.AddInt64(&dirsRead, 1) - } - - // byteCount iteration - atomic.AddInt64(&byteCount, itemSize) + parentPath, + item, + &stats, + errs) folderProgress <- struct{}{} - }(ctx, item) + }(item) } wg.Wait() - oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount) + oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount) +} + +func (oc *Collection) populateDriveItem( + ctx context.Context, + parentPath *path.Builder, + item models.DriveItemable, + stats *driveStats, + errs *fault.Bus, +) { + var ( + el = errs.Local() + itemID = ptr.Val(item.GetId()) + itemName = ptr.Val(item.GetName()) + itemSize = ptr.Val(item.GetSize()) + itemInfo details.ItemInfo + itemMeta io.ReadCloser + itemMetaSize int + metaFileName string + metaSuffix string + err error + ) + + ctx = clues.Add( + ctx, + "item_id", itemID, + "item_name", clues.Hide(itemName), + "item_size", itemSize) + + item.SetParentReference(setName(item.GetParentReference(), oc.driveName)) + + isFile := item.GetFile() != nil + + if isFile { + atomic.AddInt64(&stats.itemsFound, 1) + + metaFileName = itemID + metaSuffix = metadata.MetaFileSuffix + } else { + atomic.AddInt64(&stats.dirsFound, 1) + + // metaFileName not set for directories so we get just ".dirmeta" + metaSuffix = metadata.DirMetaFileSuffix + } + + // Fetch metadata for the file + itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item) + if err != nil { + el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation)) + return + } + + itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath) + + ctx = clues.Add(ctx, "item_info", itemInfo) + + if isFile { + dataSuffix := metadata.DataFileSuffix + + // Construct a new lazy readCloser to feed to the collection consumer. + // This ensures that downloads won't be attempted unless that consumer + // attempts to read bytes. Assumption is that kopia will check things + // like file modtimes before attempting to read. + itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs) + if err != nil { + return nil, err + } + + // display/log the item download + progReader, _ := observe.ItemProgress( + ctx, + itemData, + observe.ItemBackupMsg, + clues.Hide(itemName+dataSuffix), + itemSize) + + return progReader, nil + }) + + oc.data <- &Item{ + id: itemID + dataSuffix, + data: itemReader, + info: itemInfo, + } + } + + metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + progReader, _ := observe.ItemProgress( + ctx, + itemMeta, + observe.ItemBackupMsg, + clues.Hide(itemName+metaSuffix), + int64(itemMetaSize)) + return progReader, nil + }) + + oc.data <- &metadata.Item{ + ID: metaFileName + metaSuffix, + Data: metaReader, + // Metadata file should always use the latest time as + // permissions change does not update mod time. + Mod: time.Now(), + } + + // Item read successfully, add to collection + if isFile { + atomic.AddInt64(&stats.itemsRead, 1) + } else { + atomic.AddInt64(&stats.dirsRead, 1) + } + + atomic.AddInt64(&stats.byteCount, itemSize) } func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64) {