Refactor drive item reader
This commit is contained in:
parent
a7ae09072c
commit
e8fb164f18
@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/observe"
|
"github.com/alcionai/corso/src/internal/observe"
|
||||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/extensions"
|
||||||
"github.com/alcionai/corso/src/pkg/fault"
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
@ -403,17 +404,21 @@ func readItemContents(
|
|||||||
return rc, nil
|
return rc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type driveStats struct {
|
||||||
|
dirsRead int64
|
||||||
|
itemsRead int64
|
||||||
|
byteCount int64
|
||||||
|
itemsFound int64
|
||||||
|
dirsFound int64
|
||||||
|
}
|
||||||
|
|
||||||
// populateItems iterates through items added to the collection
|
// populateItems iterates through items added to the collection
|
||||||
// and uses the collection `itemReader` to read the item
|
// and uses the collection `itemReader` to read the item
|
||||||
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
||||||
var (
|
var (
|
||||||
byteCount int64
|
el = errs.Local()
|
||||||
itemsRead int64
|
stats driveStats
|
||||||
dirsRead int64
|
wg sync.WaitGroup
|
||||||
itemsFound int64
|
|
||||||
dirsFound int64
|
|
||||||
wg sync.WaitGroup
|
|
||||||
el = errs.Local()
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Retrieve the OneDrive folder path to set later in
|
// Retrieve the OneDrive folder path to set later in
|
||||||
@ -445,122 +450,168 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
|||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func(ctx context.Context, item models.DriveItemable) {
|
go func(item models.DriveItemable) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer func() { <-semaphoreCh }()
|
defer func() { <-semaphoreCh }()
|
||||||
|
|
||||||
// Read the item
|
// Read the item
|
||||||
var (
|
oc.populateDriveItem(
|
||||||
itemID = ptr.Val(item.GetId())
|
ctx,
|
||||||
itemName = ptr.Val(item.GetName())
|
parentPath,
|
||||||
itemSize = ptr.Val(item.GetSize())
|
item,
|
||||||
itemInfo details.ItemInfo
|
&stats,
|
||||||
itemMeta io.ReadCloser
|
&extensions.ItemExtensionHandler{},
|
||||||
itemMetaSize int
|
oc.ctrl.BackupItemExtensions,
|
||||||
metaFileName string
|
errs,
|
||||||
metaSuffix string
|
|
||||||
err error
|
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx = clues.Add(
|
|
||||||
ctx,
|
|
||||||
"item_id", itemID,
|
|
||||||
"item_name", clues.Hide(itemName),
|
|
||||||
"item_size", itemSize)
|
|
||||||
|
|
||||||
item.SetParentReference(setName(item.GetParentReference(), oc.driveName))
|
|
||||||
|
|
||||||
isFile := item.GetFile() != nil
|
|
||||||
|
|
||||||
if isFile {
|
|
||||||
atomic.AddInt64(&itemsFound, 1)
|
|
||||||
|
|
||||||
metaFileName = itemID
|
|
||||||
metaSuffix = metadata.MetaFileSuffix
|
|
||||||
} else {
|
|
||||||
atomic.AddInt64(&dirsFound, 1)
|
|
||||||
|
|
||||||
// metaFileName not set for directories so we get just ".dirmeta"
|
|
||||||
metaSuffix = metadata.DirMetaFileSuffix
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch metadata for the file
|
|
||||||
itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item)
|
|
||||||
if err != nil {
|
|
||||||
el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath)
|
|
||||||
|
|
||||||
ctx = clues.Add(ctx, "item_info", itemInfo)
|
|
||||||
|
|
||||||
if isFile {
|
|
||||||
dataSuffix := metadata.DataFileSuffix
|
|
||||||
|
|
||||||
// Construct a new lazy readCloser to feed to the collection consumer.
|
|
||||||
// This ensures that downloads won't be attempted unless that consumer
|
|
||||||
// attempts to read bytes. Assumption is that kopia will check things
|
|
||||||
// like file modtimes before attempting to read.
|
|
||||||
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
|
||||||
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// display/log the item download
|
|
||||||
progReader, _ := observe.ItemProgress(
|
|
||||||
ctx,
|
|
||||||
itemData,
|
|
||||||
observe.ItemBackupMsg,
|
|
||||||
clues.Hide(itemName+dataSuffix),
|
|
||||||
itemSize)
|
|
||||||
|
|
||||||
return progReader, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
oc.data <- &Item{
|
|
||||||
id: itemID + dataSuffix,
|
|
||||||
data: itemReader,
|
|
||||||
info: itemInfo,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
|
||||||
progReader, _ := observe.ItemProgress(
|
|
||||||
ctx,
|
|
||||||
itemMeta,
|
|
||||||
observe.ItemBackupMsg,
|
|
||||||
clues.Hide(itemName+metaSuffix),
|
|
||||||
int64(itemMetaSize))
|
|
||||||
return progReader, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
oc.data <- &metadata.Item{
|
|
||||||
ID: metaFileName + metaSuffix,
|
|
||||||
Data: metaReader,
|
|
||||||
// Metadata file should always use the latest time as
|
|
||||||
// permissions change does not update mod time.
|
|
||||||
Mod: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Item read successfully, add to collection
|
|
||||||
if isFile {
|
|
||||||
atomic.AddInt64(&itemsRead, 1)
|
|
||||||
} else {
|
|
||||||
atomic.AddInt64(&dirsRead, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// byteCount iteration
|
|
||||||
atomic.AddInt64(&byteCount, itemSize)
|
|
||||||
|
|
||||||
folderProgress <- struct{}{}
|
folderProgress <- struct{}{}
|
||||||
}(ctx, item)
|
}(item) // TODO: is copy okay here?
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount)
|
oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (oc *Collection) populateDriveItem(
|
||||||
|
ctx context.Context,
|
||||||
|
parentPath *path.Builder,
|
||||||
|
item models.DriveItemable,
|
||||||
|
stats *driveStats,
|
||||||
|
aie extensions.AddItemExtensioner,
|
||||||
|
factories []extensions.CorsoItemExtensionFactory,
|
||||||
|
errs *fault.Bus,
|
||||||
|
) {
|
||||||
|
var (
|
||||||
|
el = errs.Local()
|
||||||
|
itemID = ptr.Val(item.GetId())
|
||||||
|
itemName = ptr.Val(item.GetName())
|
||||||
|
itemSize = ptr.Val(item.GetSize())
|
||||||
|
itemInfo details.ItemInfo
|
||||||
|
itemMeta io.ReadCloser
|
||||||
|
itemMetaSize int
|
||||||
|
metaFileName string
|
||||||
|
metaSuffix string
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx = clues.Add(
|
||||||
|
ctx,
|
||||||
|
"item_id", itemID,
|
||||||
|
"item_name", clues.Hide(itemName),
|
||||||
|
"item_size", itemSize)
|
||||||
|
|
||||||
|
item.SetParentReference(setName(item.GetParentReference(), oc.driveName))
|
||||||
|
|
||||||
|
isFile := item.GetFile() != nil
|
||||||
|
|
||||||
|
if isFile {
|
||||||
|
atomic.AddInt64(&stats.itemsFound, 1)
|
||||||
|
|
||||||
|
metaFileName = itemID
|
||||||
|
metaSuffix = metadata.MetaFileSuffix
|
||||||
|
} else {
|
||||||
|
atomic.AddInt64(&stats.dirsFound, 1)
|
||||||
|
|
||||||
|
// metaFileName not set for directories so we get just ".dirmeta"
|
||||||
|
metaSuffix = metadata.DirMetaFileSuffix
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch metadata for the file
|
||||||
|
itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item)
|
||||||
|
if err != nil {
|
||||||
|
el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath)
|
||||||
|
|
||||||
|
ctx = clues.Add(ctx, "item_info", itemInfo)
|
||||||
|
|
||||||
|
if isFile {
|
||||||
|
dataSuffix := metadata.DataFileSuffix
|
||||||
|
|
||||||
|
// Construct a new lazy readCloser to feed to the collection consumer.
|
||||||
|
// This ensures that downloads won't be attempted unless that consumer
|
||||||
|
// attempts to read bytes. Assumption is that kopia will check things
|
||||||
|
// like file modtimes before attempting to read.
|
||||||
|
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||||
|
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if aie != nil && len(factories) != 0 {
|
||||||
|
logger.Ctx(ctx).Info("enabling drive item extensions")
|
||||||
|
|
||||||
|
extRc, extInfo, err := aie.AddItemExtensions(
|
||||||
|
ctx,
|
||||||
|
itemData,
|
||||||
|
itemInfo,
|
||||||
|
oc.ctrl.BackupItemExtensions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, clues.Wrap(err, "adding item extensions")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extInfo == nil {
|
||||||
|
return nil, clues.New("nil extension info")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extRc == nil {
|
||||||
|
return nil, clues.New("nil extension reader")
|
||||||
|
}
|
||||||
|
|
||||||
|
itemInfo.OneDrive.Extension = extInfo
|
||||||
|
itemData = extRc
|
||||||
|
} else {
|
||||||
|
logger.Ctx(ctx).Info("drive item extensions disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// display/log the item download
|
||||||
|
progReader, _ := observe.ItemProgress(
|
||||||
|
ctx,
|
||||||
|
itemData,
|
||||||
|
observe.ItemBackupMsg,
|
||||||
|
clues.Hide(itemName+dataSuffix),
|
||||||
|
itemSize)
|
||||||
|
|
||||||
|
return progReader, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
oc.data <- &Item{
|
||||||
|
id: itemID + dataSuffix,
|
||||||
|
data: itemReader,
|
||||||
|
info: itemInfo,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||||
|
progReader, _ := observe.ItemProgress(
|
||||||
|
ctx,
|
||||||
|
itemMeta,
|
||||||
|
observe.ItemBackupMsg,
|
||||||
|
clues.Hide(itemName+metaSuffix),
|
||||||
|
int64(itemMetaSize))
|
||||||
|
return progReader, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
oc.data <- &metadata.Item{
|
||||||
|
ID: metaFileName + metaSuffix,
|
||||||
|
Data: metaReader,
|
||||||
|
// Metadata file should always use the latest time as
|
||||||
|
// permissions change does not update mod time.
|
||||||
|
Mod: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Item read successfully, add to collection
|
||||||
|
if isFile {
|
||||||
|
atomic.AddInt64(&stats.itemsRead, 1)
|
||||||
|
} else {
|
||||||
|
atomic.AddInt64(&stats.dirsRead, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.AddInt64(&stats.byteCount, itemSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64) {
|
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64) {
|
||||||
|
|||||||
@ -978,16 +978,17 @@ func (i *SharePointInfo) updateFolder(f *FolderInfo) error {
|
|||||||
|
|
||||||
// OneDriveInfo describes a oneDrive item
|
// OneDriveInfo describes a oneDrive item
|
||||||
type OneDriveInfo struct {
|
type OneDriveInfo struct {
|
||||||
Created time.Time `json:"created,omitempty"`
|
Created time.Time `json:"created,omitempty"`
|
||||||
DriveID string `json:"driveID,omitempty"`
|
DriveID string `json:"driveID,omitempty"`
|
||||||
DriveName string `json:"driveName,omitempty"`
|
DriveName string `json:"driveName,omitempty"`
|
||||||
IsMeta bool `json:"isMeta,omitempty"`
|
IsMeta bool `json:"isMeta,omitempty"`
|
||||||
ItemName string `json:"itemName,omitempty"`
|
ItemName string `json:"itemName,omitempty"`
|
||||||
ItemType ItemType `json:"itemType,omitempty"`
|
ItemType ItemType `json:"itemType,omitempty"`
|
||||||
Modified time.Time `json:"modified,omitempty"`
|
Modified time.Time `json:"modified,omitempty"`
|
||||||
Owner string `json:"owner,omitempty"`
|
Owner string `json:"owner,omitempty"`
|
||||||
ParentPath string `json:"parentPath"`
|
ParentPath string `json:"parentPath"`
|
||||||
Size int64 `json:"size,omitempty"`
|
Size int64 `json:"size,omitempty"`
|
||||||
|
Extension *ExtensionInfo `json:"extensionData,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Headers returns the human-readable names of properties in a OneDriveInfo
|
// Headers returns the human-readable names of properties in a OneDriveInfo
|
||||||
|
|||||||
@ -8,18 +8,20 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common/dttm"
|
"github.com/alcionai/corso/src/internal/common/dttm"
|
||||||
"github.com/alcionai/corso/src/pkg/control/repository"
|
"github.com/alcionai/corso/src/pkg/control/repository"
|
||||||
|
"github.com/alcionai/corso/src/pkg/extensions"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Options holds the optional configurations for a process
|
// Options holds the optional configurations for a process
|
||||||
type Options struct {
|
type Options struct {
|
||||||
DisableMetrics bool `json:"disableMetrics"`
|
DisableMetrics bool `json:"disableMetrics"`
|
||||||
FailureHandling FailurePolicy `json:"failureHandling"`
|
FailureHandling FailurePolicy `json:"failureHandling"`
|
||||||
RestorePermissions bool `json:"restorePermissions"`
|
RestorePermissions bool `json:"restorePermissions"`
|
||||||
SkipReduce bool `json:"skipReduce"`
|
SkipReduce bool `json:"skipReduce"`
|
||||||
ToggleFeatures Toggles `json:"toggleFeatures"`
|
ToggleFeatures Toggles `json:"toggleFeatures"`
|
||||||
Parallelism Parallelism `json:"parallelism"`
|
Parallelism Parallelism `json:"parallelism"`
|
||||||
Repo repository.Options `json:"repo"`
|
Repo repository.Options `json:"repo"`
|
||||||
|
BackupItemExtensions []extensions.CorsoItemExtensionFactory `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Parallelism struct {
|
type Parallelism struct {
|
||||||
@ -49,6 +51,7 @@ func Defaults() Options {
|
|||||||
CollectionBuffer: 4,
|
CollectionBuffer: 4,
|
||||||
ItemFetch: 4,
|
ItemFetch: 4,
|
||||||
},
|
},
|
||||||
|
BackupItemExtensions: []extensions.CorsoItemExtensionFactory{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user