diff --git a/src/internal/m365/onedrive/collection.go b/src/internal/m365/onedrive/collection.go index 197bee01f..cd2dc98b2 100644 --- a/src/internal/m365/onedrive/collection.go +++ b/src/internal/m365/onedrive/collection.go @@ -21,6 +21,7 @@ import ( "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/extensions" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -403,17 +404,21 @@ func readItemContents( return rc, nil } +type driveStats struct { + dirsRead int64 + itemsRead int64 + byteCount int64 + itemsFound int64 + dirsFound int64 +} + // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { var ( - byteCount int64 - itemsRead int64 - dirsRead int64 - itemsFound int64 - dirsFound int64 - wg sync.WaitGroup - el = errs.Local() + el = errs.Local() + stats driveStats + wg sync.WaitGroup ) // Retrieve the OneDrive folder path to set later in @@ -445,122 +450,168 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { wg.Add(1) - go func(ctx context.Context, item models.DriveItemable) { + go func(item models.DriveItemable) { defer wg.Done() defer func() { <-semaphoreCh }() // Read the item - var ( - itemID = ptr.Val(item.GetId()) - itemName = ptr.Val(item.GetName()) - itemSize = ptr.Val(item.GetSize()) - itemInfo details.ItemInfo - itemMeta io.ReadCloser - itemMetaSize int - metaFileName string - metaSuffix string - err error + oc.populateDriveItem( + ctx, + parentPath, + item, + &stats, + &extensions.ItemExtensionHandler{}, + oc.ctrl.BackupItemExtensions, + errs, ) - ctx = clues.Add( - ctx, - "item_id", itemID, - "item_name", clues.Hide(itemName), - "item_size", itemSize) - - item.SetParentReference(setName(item.GetParentReference(), oc.driveName)) - - isFile := item.GetFile() != nil - - if isFile { - atomic.AddInt64(&itemsFound, 1) - - metaFileName = itemID - metaSuffix = metadata.MetaFileSuffix - } else { - atomic.AddInt64(&dirsFound, 1) - - // metaFileName not set for directories so we get just ".dirmeta" - metaSuffix = metadata.DirMetaFileSuffix - } - - // Fetch metadata for the file - itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item) - if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation)) - return - } - - itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath) - - ctx = clues.Add(ctx, "item_info", itemInfo) - - if isFile { - dataSuffix := metadata.DataFileSuffix - - // Construct a new lazy readCloser to feed to the collection consumer. - // This ensures that downloads won't be attempted unless that consumer - // attempts to read bytes. Assumption is that kopia will check things - // like file modtimes before attempting to read. - itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs) - if err != nil { - return nil, err - } - - // display/log the item download - progReader, _ := observe.ItemProgress( - ctx, - itemData, - observe.ItemBackupMsg, - clues.Hide(itemName+dataSuffix), - itemSize) - - return progReader, nil - }) - - oc.data <- &Item{ - id: itemID + dataSuffix, - data: itemReader, - info: itemInfo, - } - } - - metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - progReader, _ := observe.ItemProgress( - ctx, - itemMeta, - observe.ItemBackupMsg, - clues.Hide(itemName+metaSuffix), - int64(itemMetaSize)) - return progReader, nil - }) - - oc.data <- &metadata.Item{ - ID: metaFileName + metaSuffix, - Data: metaReader, - // Metadata file should always use the latest time as - // permissions change does not update mod time. - Mod: time.Now(), - } - - // Item read successfully, add to collection - if isFile { - atomic.AddInt64(&itemsRead, 1) - } else { - atomic.AddInt64(&dirsRead, 1) - } - - // byteCount iteration - atomic.AddInt64(&byteCount, itemSize) - folderProgress <- struct{}{} - }(ctx, item) + }(item) // TODO: is copy okay here? } wg.Wait() - oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount) + oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount) +} + +func (oc *Collection) populateDriveItem( + ctx context.Context, + parentPath *path.Builder, + item models.DriveItemable, + stats *driveStats, + aie extensions.AddItemExtensioner, + factories []extensions.CorsoItemExtensionFactory, + errs *fault.Bus, +) { + var ( + el = errs.Local() + itemID = ptr.Val(item.GetId()) + itemName = ptr.Val(item.GetName()) + itemSize = ptr.Val(item.GetSize()) + itemInfo details.ItemInfo + itemMeta io.ReadCloser + itemMetaSize int + metaFileName string + metaSuffix string + err error + ) + + ctx = clues.Add( + ctx, + "item_id", itemID, + "item_name", clues.Hide(itemName), + "item_size", itemSize) + + item.SetParentReference(setName(item.GetParentReference(), oc.driveName)) + + isFile := item.GetFile() != nil + + if isFile { + atomic.AddInt64(&stats.itemsFound, 1) + + metaFileName = itemID + metaSuffix = metadata.MetaFileSuffix + } else { + atomic.AddInt64(&stats.dirsFound, 1) + + // metaFileName not set for directories so we get just ".dirmeta" + metaSuffix = metadata.DirMetaFileSuffix + } + + // Fetch metadata for the file + itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item) + if err != nil { + el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation)) + return + } + + itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath) + + ctx = clues.Add(ctx, "item_info", itemInfo) + + if isFile { + dataSuffix := metadata.DataFileSuffix + + // Construct a new lazy readCloser to feed to the collection consumer. + // This ensures that downloads won't be attempted unless that consumer + // attempts to read bytes. Assumption is that kopia will check things + // like file modtimes before attempting to read. + itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs) + if err != nil { + return nil, err + } + + if aie != nil && len(factories) != 0 { + logger.Ctx(ctx).Info("enabling drive item extensions") + + extRc, extInfo, err := aie.AddItemExtensions( + ctx, + itemData, + itemInfo, + oc.ctrl.BackupItemExtensions) + if err != nil { + return nil, clues.Wrap(err, "adding item extensions") + } + + if extInfo == nil { + return nil, clues.New("nil extension info") + } + + if extRc == nil { + return nil, clues.New("nil extension reader") + } + + itemInfo.OneDrive.Extension = extInfo + itemData = extRc + } else { + logger.Ctx(ctx).Info("drive item extensions disabled") + } + + // display/log the item download + progReader, _ := observe.ItemProgress( + ctx, + itemData, + observe.ItemBackupMsg, + clues.Hide(itemName+dataSuffix), + itemSize) + + return progReader, nil + }) + + oc.data <- &Item{ + id: itemID + dataSuffix, + data: itemReader, + info: itemInfo, + } + } + + metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + progReader, _ := observe.ItemProgress( + ctx, + itemMeta, + observe.ItemBackupMsg, + clues.Hide(itemName+metaSuffix), + int64(itemMetaSize)) + return progReader, nil + }) + + oc.data <- &metadata.Item{ + ID: metaFileName + metaSuffix, + Data: metaReader, + // Metadata file should always use the latest time as + // permissions change does not update mod time. + Mod: time.Now(), + } + + // Item read successfully, add to collection + if isFile { + atomic.AddInt64(&stats.itemsRead, 1) + } else { + atomic.AddInt64(&stats.dirsRead, 1) + } + + atomic.AddInt64(&stats.byteCount, itemSize) } func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64) { diff --git a/src/pkg/backup/details/details.go b/src/pkg/backup/details/details.go index 6497c30c0..0f4f5eec3 100644 --- a/src/pkg/backup/details/details.go +++ b/src/pkg/backup/details/details.go @@ -978,16 +978,17 @@ func (i *SharePointInfo) updateFolder(f *FolderInfo) error { // OneDriveInfo describes a oneDrive item type OneDriveInfo struct { - Created time.Time `json:"created,omitempty"` - DriveID string `json:"driveID,omitempty"` - DriveName string `json:"driveName,omitempty"` - IsMeta bool `json:"isMeta,omitempty"` - ItemName string `json:"itemName,omitempty"` - ItemType ItemType `json:"itemType,omitempty"` - Modified time.Time `json:"modified,omitempty"` - Owner string `json:"owner,omitempty"` - ParentPath string `json:"parentPath"` - Size int64 `json:"size,omitempty"` + Created time.Time `json:"created,omitempty"` + DriveID string `json:"driveID,omitempty"` + DriveName string `json:"driveName,omitempty"` + IsMeta bool `json:"isMeta,omitempty"` + ItemName string `json:"itemName,omitempty"` + ItemType ItemType `json:"itemType,omitempty"` + Modified time.Time `json:"modified,omitempty"` + Owner string `json:"owner,omitempty"` + ParentPath string `json:"parentPath"` + Size int64 `json:"size,omitempty"` + Extension *ExtensionInfo `json:"extensionData,omitempty"` } // Headers returns the human-readable names of properties in a OneDriveInfo diff --git a/src/pkg/control/options.go b/src/pkg/control/options.go index d82907397..5fc96e92d 100644 --- a/src/pkg/control/options.go +++ b/src/pkg/control/options.go @@ -8,18 +8,20 @@ import ( "github.com/alcionai/corso/src/internal/common/dttm" "github.com/alcionai/corso/src/pkg/control/repository" + "github.com/alcionai/corso/src/pkg/extensions" "github.com/alcionai/corso/src/pkg/logger" ) // Options holds the optional configurations for a process type Options struct { - DisableMetrics bool `json:"disableMetrics"` - FailureHandling FailurePolicy `json:"failureHandling"` - RestorePermissions bool `json:"restorePermissions"` - SkipReduce bool `json:"skipReduce"` - ToggleFeatures Toggles `json:"toggleFeatures"` - Parallelism Parallelism `json:"parallelism"` - Repo repository.Options `json:"repo"` + DisableMetrics bool `json:"disableMetrics"` + FailureHandling FailurePolicy `json:"failureHandling"` + RestorePermissions bool `json:"restorePermissions"` + SkipReduce bool `json:"skipReduce"` + ToggleFeatures Toggles `json:"toggleFeatures"` + Parallelism Parallelism `json:"parallelism"` + Repo repository.Options `json:"repo"` + BackupItemExtensions []extensions.CorsoItemExtensionFactory `json:"-"` } type Parallelism struct { @@ -49,6 +51,7 @@ func Defaults() Options { CollectionBuffer: 4, ItemFetch: 4, }, + BackupItemExtensions: []extensions.CorsoItemExtensionFactory{}, } }