Compare commits
8 Commits
main
...
corso_exte
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0ae2e31ea | ||
|
|
b7ea2ae4df | ||
|
|
e8fb164f18 | ||
|
|
a7ae09072c | ||
|
|
49007969cc | ||
|
|
e1cf901d88 | ||
|
|
81bee48ed7 | ||
|
|
f94ae48fca |
@ -21,6 +21,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/extensions"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
@ -403,17 +404,21 @@ func readItemContents(
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
type driveStats struct {
|
||||
dirsRead int64
|
||||
itemsRead int64
|
||||
byteCount int64
|
||||
itemsFound int64
|
||||
dirsFound int64
|
||||
}
|
||||
|
||||
// populateItems iterates through items added to the collection
|
||||
// and uses the collection `itemReader` to read the item
|
||||
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
||||
var (
|
||||
byteCount int64
|
||||
itemsRead int64
|
||||
dirsRead int64
|
||||
itemsFound int64
|
||||
dirsFound int64
|
||||
wg sync.WaitGroup
|
||||
el = errs.Local()
|
||||
el = errs.Local()
|
||||
stats driveStats
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
// Retrieve the OneDrive folder path to set later in
|
||||
@ -445,122 +450,168 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func(ctx context.Context, item models.DriveItemable) {
|
||||
go func(item models.DriveItemable) {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphoreCh }()
|
||||
|
||||
// Read the item
|
||||
var (
|
||||
itemID = ptr.Val(item.GetId())
|
||||
itemName = ptr.Val(item.GetName())
|
||||
itemSize = ptr.Val(item.GetSize())
|
||||
itemInfo details.ItemInfo
|
||||
itemMeta io.ReadCloser
|
||||
itemMetaSize int
|
||||
metaFileName string
|
||||
metaSuffix string
|
||||
err error
|
||||
oc.populateDriveItem(
|
||||
ctx,
|
||||
parentPath,
|
||||
item,
|
||||
&stats,
|
||||
&extensions.ItemExtensionHandler{},
|
||||
oc.ctrl.BackupItemExtensions,
|
||||
errs,
|
||||
)
|
||||
|
||||
ctx = clues.Add(
|
||||
ctx,
|
||||
"item_id", itemID,
|
||||
"item_name", clues.Hide(itemName),
|
||||
"item_size", itemSize)
|
||||
|
||||
item.SetParentReference(setName(item.GetParentReference(), oc.driveName))
|
||||
|
||||
isFile := item.GetFile() != nil
|
||||
|
||||
if isFile {
|
||||
atomic.AddInt64(&itemsFound, 1)
|
||||
|
||||
metaFileName = itemID
|
||||
metaSuffix = metadata.MetaFileSuffix
|
||||
} else {
|
||||
atomic.AddInt64(&dirsFound, 1)
|
||||
|
||||
// metaFileName not set for directories so we get just ".dirmeta"
|
||||
metaSuffix = metadata.DirMetaFileSuffix
|
||||
}
|
||||
|
||||
// Fetch metadata for the file
|
||||
itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item)
|
||||
if err != nil {
|
||||
el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
|
||||
return
|
||||
}
|
||||
|
||||
itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath)
|
||||
|
||||
ctx = clues.Add(ctx, "item_info", itemInfo)
|
||||
|
||||
if isFile {
|
||||
dataSuffix := metadata.DataFileSuffix
|
||||
|
||||
// Construct a new lazy readCloser to feed to the collection consumer.
|
||||
// This ensures that downloads won't be attempted unless that consumer
|
||||
// attempts to read bytes. Assumption is that kopia will check things
|
||||
// like file modtimes before attempting to read.
|
||||
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// display/log the item download
|
||||
progReader, _ := observe.ItemProgress(
|
||||
ctx,
|
||||
itemData,
|
||||
observe.ItemBackupMsg,
|
||||
clues.Hide(itemName+dataSuffix),
|
||||
itemSize)
|
||||
|
||||
return progReader, nil
|
||||
})
|
||||
|
||||
oc.data <- &Item{
|
||||
id: itemID + dataSuffix,
|
||||
data: itemReader,
|
||||
info: itemInfo,
|
||||
}
|
||||
}
|
||||
|
||||
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||
progReader, _ := observe.ItemProgress(
|
||||
ctx,
|
||||
itemMeta,
|
||||
observe.ItemBackupMsg,
|
||||
clues.Hide(itemName+metaSuffix),
|
||||
int64(itemMetaSize))
|
||||
return progReader, nil
|
||||
})
|
||||
|
||||
oc.data <- &metadata.Item{
|
||||
ID: metaFileName + metaSuffix,
|
||||
Data: metaReader,
|
||||
// Metadata file should always use the latest time as
|
||||
// permissions change does not update mod time.
|
||||
Mod: time.Now(),
|
||||
}
|
||||
|
||||
// Item read successfully, add to collection
|
||||
if isFile {
|
||||
atomic.AddInt64(&itemsRead, 1)
|
||||
} else {
|
||||
atomic.AddInt64(&dirsRead, 1)
|
||||
}
|
||||
|
||||
// byteCount iteration
|
||||
atomic.AddInt64(&byteCount, itemSize)
|
||||
|
||||
folderProgress <- struct{}{}
|
||||
}(ctx, item)
|
||||
}(item) // TODO: is copy okay here?
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount)
|
||||
oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount)
|
||||
}
|
||||
|
||||
func (oc *Collection) populateDriveItem(
|
||||
ctx context.Context,
|
||||
parentPath *path.Builder,
|
||||
item models.DriveItemable,
|
||||
stats *driveStats,
|
||||
aie extensions.AddItemExtensioner,
|
||||
factories []extensions.CorsoItemExtensionFactory,
|
||||
errs *fault.Bus,
|
||||
) {
|
||||
var (
|
||||
el = errs.Local()
|
||||
itemID = ptr.Val(item.GetId())
|
||||
itemName = ptr.Val(item.GetName())
|
||||
itemSize = ptr.Val(item.GetSize())
|
||||
itemInfo details.ItemInfo
|
||||
itemMeta io.ReadCloser
|
||||
itemMetaSize int
|
||||
metaFileName string
|
||||
metaSuffix string
|
||||
err error
|
||||
)
|
||||
|
||||
ctx = clues.Add(
|
||||
ctx,
|
||||
"item_id", itemID,
|
||||
"item_name", clues.Hide(itemName),
|
||||
"item_size", itemSize)
|
||||
|
||||
item.SetParentReference(setName(item.GetParentReference(), oc.driveName))
|
||||
|
||||
isFile := item.GetFile() != nil
|
||||
|
||||
if isFile {
|
||||
atomic.AddInt64(&stats.itemsFound, 1)
|
||||
|
||||
metaFileName = itemID
|
||||
metaSuffix = metadata.MetaFileSuffix
|
||||
} else {
|
||||
atomic.AddInt64(&stats.dirsFound, 1)
|
||||
|
||||
// metaFileName not set for directories so we get just ".dirmeta"
|
||||
metaSuffix = metadata.DirMetaFileSuffix
|
||||
}
|
||||
|
||||
// Fetch metadata for the file
|
||||
itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item)
|
||||
if err != nil {
|
||||
el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
|
||||
return
|
||||
}
|
||||
|
||||
itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath)
|
||||
|
||||
ctx = clues.Add(ctx, "item_info", itemInfo)
|
||||
|
||||
if isFile {
|
||||
dataSuffix := metadata.DataFileSuffix
|
||||
|
||||
// Construct a new lazy readCloser to feed to the collection consumer.
|
||||
// This ensures that downloads won't be attempted unless that consumer
|
||||
// attempts to read bytes. Assumption is that kopia will check things
|
||||
// like file modtimes before attempting to read.
|
||||
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if aie != nil && len(factories) != 0 {
|
||||
logger.Ctx(ctx).Info("enabling drive item extensions")
|
||||
|
||||
extRc, extInfo, err := aie.AddItemExtensions(
|
||||
ctx,
|
||||
itemData,
|
||||
itemInfo,
|
||||
oc.ctrl.BackupItemExtensions)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "adding item extensions")
|
||||
}
|
||||
|
||||
if extInfo == nil {
|
||||
return nil, clues.New("nil extension info")
|
||||
}
|
||||
|
||||
if extRc == nil {
|
||||
return nil, clues.New("nil extension reader")
|
||||
}
|
||||
|
||||
itemInfo.OneDrive.Extension = extInfo
|
||||
itemData = extRc
|
||||
} else {
|
||||
logger.Ctx(ctx).Info("drive item extensions disabled")
|
||||
}
|
||||
|
||||
// display/log the item download
|
||||
progReader, _ := observe.ItemProgress(
|
||||
ctx,
|
||||
itemData,
|
||||
observe.ItemBackupMsg,
|
||||
clues.Hide(itemName+dataSuffix),
|
||||
itemSize)
|
||||
|
||||
return progReader, nil
|
||||
})
|
||||
|
||||
oc.data <- &Item{
|
||||
id: itemID + dataSuffix,
|
||||
data: itemReader,
|
||||
info: itemInfo,
|
||||
}
|
||||
}
|
||||
|
||||
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||
progReader, _ := observe.ItemProgress(
|
||||
ctx,
|
||||
itemMeta,
|
||||
observe.ItemBackupMsg,
|
||||
clues.Hide(itemName+metaSuffix),
|
||||
int64(itemMetaSize))
|
||||
return progReader, nil
|
||||
})
|
||||
|
||||
oc.data <- &metadata.Item{
|
||||
ID: metaFileName + metaSuffix,
|
||||
Data: metaReader,
|
||||
// Metadata file should always use the latest time as
|
||||
// permissions change does not update mod time.
|
||||
Mod: time.Now(),
|
||||
}
|
||||
|
||||
// Item read successfully, add to collection
|
||||
if isFile {
|
||||
atomic.AddInt64(&stats.itemsRead, 1)
|
||||
} else {
|
||||
atomic.AddInt64(&stats.dirsRead, 1)
|
||||
}
|
||||
|
||||
atomic.AddInt64(&stats.byteCount, itemSize)
|
||||
}
|
||||
|
||||
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64) {
|
||||
|
||||
@ -978,16 +978,17 @@ func (i *SharePointInfo) updateFolder(f *FolderInfo) error {
|
||||
|
||||
// OneDriveInfo describes a oneDrive item
|
||||
type OneDriveInfo struct {
|
||||
Created time.Time `json:"created,omitempty"`
|
||||
DriveID string `json:"driveID,omitempty"`
|
||||
DriveName string `json:"driveName,omitempty"`
|
||||
IsMeta bool `json:"isMeta,omitempty"`
|
||||
ItemName string `json:"itemName,omitempty"`
|
||||
ItemType ItemType `json:"itemType,omitempty"`
|
||||
Modified time.Time `json:"modified,omitempty"`
|
||||
Owner string `json:"owner,omitempty"`
|
||||
ParentPath string `json:"parentPath"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
Created time.Time `json:"created,omitempty"`
|
||||
DriveID string `json:"driveID,omitempty"`
|
||||
DriveName string `json:"driveName,omitempty"`
|
||||
IsMeta bool `json:"isMeta,omitempty"`
|
||||
ItemName string `json:"itemName,omitempty"`
|
||||
ItemType ItemType `json:"itemType,omitempty"`
|
||||
Modified time.Time `json:"modified,omitempty"`
|
||||
Owner string `json:"owner,omitempty"`
|
||||
ParentPath string `json:"parentPath"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
Extension *ExtensionInfo `json:"extensionData,omitempty"`
|
||||
}
|
||||
|
||||
// Headers returns the human-readable names of properties in a OneDriveInfo
|
||||
@ -1044,3 +1045,10 @@ func updateFolderWithinDrive(
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExtensionInfo describes extension data associated with an item
|
||||
// TODO: Expose this store behind an interface which can synchrnoize access to the
|
||||
// underlying map.
|
||||
type ExtensionInfo struct {
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
@ -8,18 +8,20 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/dttm"
|
||||
"github.com/alcionai/corso/src/pkg/control/repository"
|
||||
"github.com/alcionai/corso/src/pkg/extensions"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
// Options holds the optional configurations for a process
|
||||
type Options struct {
|
||||
DisableMetrics bool `json:"disableMetrics"`
|
||||
FailureHandling FailurePolicy `json:"failureHandling"`
|
||||
RestorePermissions bool `json:"restorePermissions"`
|
||||
SkipReduce bool `json:"skipReduce"`
|
||||
ToggleFeatures Toggles `json:"toggleFeatures"`
|
||||
Parallelism Parallelism `json:"parallelism"`
|
||||
Repo repository.Options `json:"repo"`
|
||||
DisableMetrics bool `json:"disableMetrics"`
|
||||
FailureHandling FailurePolicy `json:"failureHandling"`
|
||||
RestorePermissions bool `json:"restorePermissions"`
|
||||
SkipReduce bool `json:"skipReduce"`
|
||||
ToggleFeatures Toggles `json:"toggleFeatures"`
|
||||
Parallelism Parallelism `json:"parallelism"`
|
||||
Repo repository.Options `json:"repo"`
|
||||
BackupItemExtensions []extensions.CorsoItemExtensionFactory `json:"-"`
|
||||
}
|
||||
|
||||
type Parallelism struct {
|
||||
@ -49,6 +51,7 @@ func Defaults() Options {
|
||||
CollectionBuffer: 4,
|
||||
ItemFetch: 4,
|
||||
},
|
||||
BackupItemExtensions: []extensions.CorsoItemExtensionFactory{},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
126
src/pkg/extensions/extensions.go
Normal file
126
src/pkg/extensions/extensions.go
Normal file
@ -0,0 +1,126 @@
|
||||
package extensions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
// Extension client interface
|
||||
type CorsoItemExtension interface {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
type CorsoItemExtensionFactory func(
|
||||
context.Context,
|
||||
io.ReadCloser,
|
||||
details.ItemInfo,
|
||||
*details.ExtensionInfo,
|
||||
) (CorsoItemExtension, error)
|
||||
|
||||
// Thin wrapper for runtime logging & metrics around extensions
|
||||
type loggerExtension struct {
|
||||
info details.ItemInfo
|
||||
innerRc io.ReadCloser
|
||||
ctx context.Context
|
||||
extInfo *details.ExtensionInfo
|
||||
}
|
||||
|
||||
func NewLoggerExtension(
|
||||
ctx context.Context,
|
||||
rc io.ReadCloser,
|
||||
info details.ItemInfo,
|
||||
extInfo *details.ExtensionInfo,
|
||||
) (CorsoItemExtension, error) {
|
||||
return &loggerExtension{
|
||||
ctx: ctx,
|
||||
innerRc: rc,
|
||||
info: info,
|
||||
extInfo: extInfo,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *loggerExtension) Read(p []byte) (int, error) {
|
||||
n, err := l.innerRc.Read(p)
|
||||
if err != nil && err != io.EOF {
|
||||
logger.CtxErr(l.ctx, err).Error("inner read")
|
||||
return n, err
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
logger.Ctx(l.ctx).Debug("corso extensions: EOF")
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (l *loggerExtension) Close() error {
|
||||
err := l.innerRc.Close()
|
||||
if err != nil {
|
||||
logger.CtxErr(l.ctx, err).Error("inner close")
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Ctx(l.ctx).Info("corso extensions: closed")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type AddItemExtensioner interface {
|
||||
AddItemExtensions(
|
||||
context.Context,
|
||||
io.ReadCloser,
|
||||
details.ItemInfo,
|
||||
[]CorsoItemExtensionFactory,
|
||||
) (io.ReadCloser, *details.ExtensionInfo, error)
|
||||
}
|
||||
|
||||
var _ AddItemExtensioner = &ItemExtensionHandler{}
|
||||
|
||||
type ItemExtensionHandler struct{}
|
||||
|
||||
// AddItemExtensions wraps provided readcloser with extensions
|
||||
// supplied via factory, with the first extension in slice being
|
||||
// the innermost one.
|
||||
func (eh *ItemExtensionHandler) AddItemExtensions(
|
||||
ctx context.Context,
|
||||
rc io.ReadCloser,
|
||||
info details.ItemInfo,
|
||||
factories []CorsoItemExtensionFactory,
|
||||
) (io.ReadCloser, *details.ExtensionInfo, error) {
|
||||
if rc == nil {
|
||||
return nil, nil, clues.New("nil inner readcloser")
|
||||
}
|
||||
|
||||
if len(factories) == 0 {
|
||||
return nil, nil, clues.New("no extensions supplied")
|
||||
}
|
||||
|
||||
factories = append(factories, NewLoggerExtension)
|
||||
ctx = clues.Add(ctx, "num_extensions", len(factories))
|
||||
|
||||
extInfo := &details.ExtensionInfo{
|
||||
Data: make(map[string]any),
|
||||
}
|
||||
|
||||
for _, factory := range factories {
|
||||
if factory == nil {
|
||||
return nil, nil, clues.New("nil extension factory")
|
||||
}
|
||||
|
||||
extRc, err := factory(ctx, rc, info, extInfo)
|
||||
if err != nil {
|
||||
return nil, nil, clues.Wrap(err, "calling extension factory")
|
||||
}
|
||||
|
||||
rc = extRc
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Info("added extensions")
|
||||
|
||||
return rc, extInfo, nil
|
||||
}
|
||||
184
src/pkg/extensions/extensions_test.go
Normal file
184
src/pkg/extensions/extensions_test.go
Normal file
@ -0,0 +1,184 @@
|
||||
package extensions
|
||||
|
||||
// Tests for extensions.go
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
)
|
||||
|
||||
type ExtensionsUnitSuite struct {
|
||||
tester.Suite
|
||||
}
|
||||
|
||||
func TestExtensionsUnitSuite(t *testing.T) {
|
||||
suite.Run(t, &ExtensionsUnitSuite{Suite: tester.NewUnitSuite(t)})
|
||||
}
|
||||
|
||||
func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
|
||||
type outputValidationFunc func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
err error,
|
||||
) bool
|
||||
|
||||
var (
|
||||
testRc = io.NopCloser(bytes.NewReader([]byte("some data")))
|
||||
testItemInfo = details.ItemInfo{
|
||||
OneDrive: &details.OneDriveInfo{
|
||||
DriveID: "driveID",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
factories []CorsoItemExtensionFactory
|
||||
rc io.ReadCloser
|
||||
validateOutputs outputValidationFunc
|
||||
}{
|
||||
{
|
||||
name: "happy path",
|
||||
factories: []CorsoItemExtensionFactory{
|
||||
NewMockExtension,
|
||||
},
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
err error,
|
||||
) bool {
|
||||
return err == nil && extRc != nil && extInfo != nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multiple valid factories",
|
||||
factories: []CorsoItemExtensionFactory{
|
||||
NewMockExtension,
|
||||
NewMockExtension,
|
||||
NewMockExtension,
|
||||
},
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
err error,
|
||||
) bool {
|
||||
return err == nil && extRc != nil && extInfo != nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no factories supplied",
|
||||
factories: nil,
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "factory slice contains nil",
|
||||
factories: []CorsoItemExtensionFactory{
|
||||
NewMockExtension,
|
||||
nil,
|
||||
NewMockExtension,
|
||||
},
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "factory call returns error",
|
||||
factories: []CorsoItemExtensionFactory{
|
||||
func(
|
||||
ctx context.Context,
|
||||
rc io.ReadCloser,
|
||||
info details.ItemInfo,
|
||||
extInfo *details.ExtensionInfo,
|
||||
) (CorsoItemExtension, error) {
|
||||
return nil, clues.New("creating extension")
|
||||
},
|
||||
},
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one or more factory calls return error",
|
||||
factories: []CorsoItemExtensionFactory{
|
||||
NewMockExtension,
|
||||
func(
|
||||
ctx context.Context,
|
||||
rc io.ReadCloser,
|
||||
info details.ItemInfo,
|
||||
extInfo *details.ExtensionInfo,
|
||||
) (CorsoItemExtension, error) {
|
||||
return nil, clues.New("creating extension")
|
||||
},
|
||||
},
|
||||
rc: testRc,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "nil inner rc",
|
||||
factories: []CorsoItemExtensionFactory{
|
||||
NewMockExtension,
|
||||
},
|
||||
rc: nil,
|
||||
validateOutputs: func(
|
||||
extRc io.ReadCloser,
|
||||
extInfo *details.ExtensionInfo,
|
||||
err error,
|
||||
) bool {
|
||||
return err != nil && extRc == nil && extInfo == nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
ith := &ItemExtensionHandler{}
|
||||
|
||||
extRc, extInfo, err := ith.AddItemExtensions(
|
||||
ctx,
|
||||
test.rc,
|
||||
testItemInfo,
|
||||
test.factories)
|
||||
require.True(t, test.validateOutputs(extRc, extInfo, err))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: tests for loggerExtension
|
||||
// TODO: Tests to verify RC wrapper ordering by AddItemExtensioner
|
||||
81
src/pkg/extensions/mock_extensions.go
Normal file
81
src/pkg/extensions/mock_extensions.go
Normal file
@ -0,0 +1,81 @@
|
||||
package extensions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
var _ CorsoItemExtension = &MockExtension{}
|
||||
|
||||
type MockExtension struct {
|
||||
numBytes int
|
||||
crc32 uint32
|
||||
info details.ItemInfo
|
||||
extInfo *details.ExtensionInfo
|
||||
innerRc io.ReadCloser
|
||||
ctx context.Context
|
||||
failOnRead bool
|
||||
failOnClose bool
|
||||
}
|
||||
|
||||
func (me *MockExtension) Read(p []byte) (int, error) {
|
||||
if me.failOnRead {
|
||||
return 0, clues.New("mock read error")
|
||||
}
|
||||
|
||||
n, err := me.innerRc.Read(p)
|
||||
if err != nil && err != io.EOF {
|
||||
logger.CtxErr(me.ctx, err).Error("inner read error")
|
||||
return n, err
|
||||
}
|
||||
|
||||
me.numBytes += n
|
||||
me.crc32 = crc32.Update(me.crc32, crc32.IEEETable, p[:n])
|
||||
|
||||
if err == io.EOF {
|
||||
logger.Ctx(me.ctx).Debug("mock extension reached EOF")
|
||||
me.extInfo.Data["numBytes"] = me.numBytes
|
||||
me.extInfo.Data["crc32"] = me.crc32
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (me *MockExtension) Close() error {
|
||||
if me.failOnClose {
|
||||
return clues.New("mock close error")
|
||||
}
|
||||
|
||||
err := me.innerRc.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
me.extInfo.Data["numBytes"] = me.numBytes
|
||||
me.extInfo.Data["crc32"] = me.crc32
|
||||
logger.Ctx(me.ctx).Infow(
|
||||
"mock extension closed",
|
||||
"numBytes", me.numBytes, "crc32", me.crc32)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMockExtension(
|
||||
ctx context.Context,
|
||||
rc io.ReadCloser,
|
||||
info details.ItemInfo,
|
||||
extInfo *details.ExtensionInfo,
|
||||
) (CorsoItemExtension, error) {
|
||||
return &MockExtension{
|
||||
ctx: ctx,
|
||||
innerRc: rc,
|
||||
info: info,
|
||||
extInfo: extInfo,
|
||||
}, nil
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user