Compare commits

...

8 Commits

Author SHA1 Message Date
Abhishek Pandey
b0ae2e31ea Move mocks to a seperate file 2023-06-30 09:47:33 -07:00
Abhishek Pandey
b7ea2ae4df Minor nits 2023-06-30 09:26:13 -07:00
Abhishek Pandey
e8fb164f18 Refactor drive item reader 2023-06-30 04:28:09 -07:00
Abhishek Pandey
a7ae09072c Minor cleanup 2023-06-30 02:23:19 -07:00
Abhishek Pandey
49007969cc Add logger extension 2023-06-30 02:04:07 -07:00
Abhishek Pandey
e1cf901d88 Add AddItemExtensioner interface 2023-06-30 01:35:22 -07:00
Abhishek Pandey
81bee48ed7 Remove duplicate tests 2023-06-30 00:11:23 -07:00
Abhishek Pandey
f94ae48fca Add corso extension package 2023-06-30 00:08:42 -07:00
6 changed files with 582 additions and 129 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/extensions"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -403,17 +404,21 @@ func readItemContents(
return rc, nil
}
type driveStats struct {
dirsRead int64
itemsRead int64
byteCount int64
itemsFound int64
dirsFound int64
}
// populateItems iterates through items added to the collection
// and uses the collection `itemReader` to read the item
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
var (
byteCount int64
itemsRead int64
dirsRead int64
itemsFound int64
dirsFound int64
wg sync.WaitGroup
el = errs.Local()
el = errs.Local()
stats driveStats
wg sync.WaitGroup
)
// Retrieve the OneDrive folder path to set later in
@ -445,122 +450,168 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
wg.Add(1)
go func(ctx context.Context, item models.DriveItemable) {
go func(item models.DriveItemable) {
defer wg.Done()
defer func() { <-semaphoreCh }()
// Read the item
var (
itemID = ptr.Val(item.GetId())
itemName = ptr.Val(item.GetName())
itemSize = ptr.Val(item.GetSize())
itemInfo details.ItemInfo
itemMeta io.ReadCloser
itemMetaSize int
metaFileName string
metaSuffix string
err error
oc.populateDriveItem(
ctx,
parentPath,
item,
&stats,
&extensions.ItemExtensionHandler{},
oc.ctrl.BackupItemExtensions,
errs,
)
ctx = clues.Add(
ctx,
"item_id", itemID,
"item_name", clues.Hide(itemName),
"item_size", itemSize)
item.SetParentReference(setName(item.GetParentReference(), oc.driveName))
isFile := item.GetFile() != nil
if isFile {
atomic.AddInt64(&itemsFound, 1)
metaFileName = itemID
metaSuffix = metadata.MetaFileSuffix
} else {
atomic.AddInt64(&dirsFound, 1)
// metaFileName not set for directories so we get just ".dirmeta"
metaSuffix = metadata.DirMetaFileSuffix
}
// Fetch metadata for the file
itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item)
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
return
}
itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath)
ctx = clues.Add(ctx, "item_info", itemInfo)
if isFile {
dataSuffix := metadata.DataFileSuffix
// Construct a new lazy readCloser to feed to the collection consumer.
// This ensures that downloads won't be attempted unless that consumer
// attempts to read bytes. Assumption is that kopia will check things
// like file modtimes before attempting to read.
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
if err != nil {
return nil, err
}
// display/log the item download
progReader, _ := observe.ItemProgress(
ctx,
itemData,
observe.ItemBackupMsg,
clues.Hide(itemName+dataSuffix),
itemSize)
return progReader, nil
})
oc.data <- &Item{
id: itemID + dataSuffix,
data: itemReader,
info: itemInfo,
}
}
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
progReader, _ := observe.ItemProgress(
ctx,
itemMeta,
observe.ItemBackupMsg,
clues.Hide(itemName+metaSuffix),
int64(itemMetaSize))
return progReader, nil
})
oc.data <- &metadata.Item{
ID: metaFileName + metaSuffix,
Data: metaReader,
// Metadata file should always use the latest time as
// permissions change does not update mod time.
Mod: time.Now(),
}
// Item read successfully, add to collection
if isFile {
atomic.AddInt64(&itemsRead, 1)
} else {
atomic.AddInt64(&dirsRead, 1)
}
// byteCount iteration
atomic.AddInt64(&byteCount, itemSize)
folderProgress <- struct{}{}
}(ctx, item)
}(item) // TODO: is copy okay here?
}
wg.Wait()
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount)
oc.reportAsCompleted(ctx, int(stats.itemsFound), int(stats.itemsRead), stats.byteCount)
}
func (oc *Collection) populateDriveItem(
ctx context.Context,
parentPath *path.Builder,
item models.DriveItemable,
stats *driveStats,
aie extensions.AddItemExtensioner,
factories []extensions.CorsoItemExtensionFactory,
errs *fault.Bus,
) {
var (
el = errs.Local()
itemID = ptr.Val(item.GetId())
itemName = ptr.Val(item.GetName())
itemSize = ptr.Val(item.GetSize())
itemInfo details.ItemInfo
itemMeta io.ReadCloser
itemMetaSize int
metaFileName string
metaSuffix string
err error
)
ctx = clues.Add(
ctx,
"item_id", itemID,
"item_name", clues.Hide(itemName),
"item_size", itemSize)
item.SetParentReference(setName(item.GetParentReference(), oc.driveName))
isFile := item.GetFile() != nil
if isFile {
atomic.AddInt64(&stats.itemsFound, 1)
metaFileName = itemID
metaSuffix = metadata.MetaFileSuffix
} else {
atomic.AddInt64(&stats.dirsFound, 1)
// metaFileName not set for directories so we get just ".dirmeta"
metaSuffix = metadata.DirMetaFileSuffix
}
// Fetch metadata for the file
itemMeta, itemMetaSize, err = downloadItemMeta(ctx, oc.handler, oc.driveID, item)
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation))
return
}
itemInfo = oc.handler.AugmentItemInfo(itemInfo, item, itemSize, parentPath)
ctx = clues.Add(ctx, "item_info", itemInfo)
if isFile {
dataSuffix := metadata.DataFileSuffix
// Construct a new lazy readCloser to feed to the collection consumer.
// This ensures that downloads won't be attempted unless that consumer
// attempts to read bytes. Assumption is that kopia will check things
// like file modtimes before attempting to read.
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
itemData, err := oc.getDriveItemContent(ctx, oc.driveID, item, errs)
if err != nil {
return nil, err
}
if aie != nil && len(factories) != 0 {
logger.Ctx(ctx).Info("enabling drive item extensions")
extRc, extInfo, err := aie.AddItemExtensions(
ctx,
itemData,
itemInfo,
oc.ctrl.BackupItemExtensions)
if err != nil {
return nil, clues.Wrap(err, "adding item extensions")
}
if extInfo == nil {
return nil, clues.New("nil extension info")
}
if extRc == nil {
return nil, clues.New("nil extension reader")
}
itemInfo.OneDrive.Extension = extInfo
itemData = extRc
} else {
logger.Ctx(ctx).Info("drive item extensions disabled")
}
// display/log the item download
progReader, _ := observe.ItemProgress(
ctx,
itemData,
observe.ItemBackupMsg,
clues.Hide(itemName+dataSuffix),
itemSize)
return progReader, nil
})
oc.data <- &Item{
id: itemID + dataSuffix,
data: itemReader,
info: itemInfo,
}
}
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
progReader, _ := observe.ItemProgress(
ctx,
itemMeta,
observe.ItemBackupMsg,
clues.Hide(itemName+metaSuffix),
int64(itemMetaSize))
return progReader, nil
})
oc.data <- &metadata.Item{
ID: metaFileName + metaSuffix,
Data: metaReader,
// Metadata file should always use the latest time as
// permissions change does not update mod time.
Mod: time.Now(),
}
// Item read successfully, add to collection
if isFile {
atomic.AddInt64(&stats.itemsRead, 1)
} else {
atomic.AddInt64(&stats.dirsRead, 1)
}
atomic.AddInt64(&stats.byteCount, itemSize)
}
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64) {

View File

@ -978,16 +978,17 @@ func (i *SharePointInfo) updateFolder(f *FolderInfo) error {
// OneDriveInfo describes a oneDrive item
type OneDriveInfo struct {
Created time.Time `json:"created,omitempty"`
DriveID string `json:"driveID,omitempty"`
DriveName string `json:"driveName,omitempty"`
IsMeta bool `json:"isMeta,omitempty"`
ItemName string `json:"itemName,omitempty"`
ItemType ItemType `json:"itemType,omitempty"`
Modified time.Time `json:"modified,omitempty"`
Owner string `json:"owner,omitempty"`
ParentPath string `json:"parentPath"`
Size int64 `json:"size,omitempty"`
Created time.Time `json:"created,omitempty"`
DriveID string `json:"driveID,omitempty"`
DriveName string `json:"driveName,omitempty"`
IsMeta bool `json:"isMeta,omitempty"`
ItemName string `json:"itemName,omitempty"`
ItemType ItemType `json:"itemType,omitempty"`
Modified time.Time `json:"modified,omitempty"`
Owner string `json:"owner,omitempty"`
ParentPath string `json:"parentPath"`
Size int64 `json:"size,omitempty"`
Extension *ExtensionInfo `json:"extensionData,omitempty"`
}
// Headers returns the human-readable names of properties in a OneDriveInfo
@ -1044,3 +1045,10 @@ func updateFolderWithinDrive(
return nil
}
// ExtensionInfo describes extension data associated with an item
// TODO: Expose this store behind an interface which can synchrnoize access to the
// underlying map.
type ExtensionInfo struct {
Data map[string]any `json:"data,omitempty"`
}

View File

@ -8,18 +8,20 @@ import (
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/extensions"
"github.com/alcionai/corso/src/pkg/logger"
)
// Options holds the optional configurations for a process
type Options struct {
DisableMetrics bool `json:"disableMetrics"`
FailureHandling FailurePolicy `json:"failureHandling"`
RestorePermissions bool `json:"restorePermissions"`
SkipReduce bool `json:"skipReduce"`
ToggleFeatures Toggles `json:"toggleFeatures"`
Parallelism Parallelism `json:"parallelism"`
Repo repository.Options `json:"repo"`
DisableMetrics bool `json:"disableMetrics"`
FailureHandling FailurePolicy `json:"failureHandling"`
RestorePermissions bool `json:"restorePermissions"`
SkipReduce bool `json:"skipReduce"`
ToggleFeatures Toggles `json:"toggleFeatures"`
Parallelism Parallelism `json:"parallelism"`
Repo repository.Options `json:"repo"`
BackupItemExtensions []extensions.CorsoItemExtensionFactory `json:"-"`
}
type Parallelism struct {
@ -49,6 +51,7 @@ func Defaults() Options {
CollectionBuffer: 4,
ItemFetch: 4,
},
BackupItemExtensions: []extensions.CorsoItemExtensionFactory{},
}
}

View File

@ -0,0 +1,126 @@
package extensions
import (
"context"
"io"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
)
// Extension client interface
type CorsoItemExtension interface {
io.ReadCloser
}
type CorsoItemExtensionFactory func(
context.Context,
io.ReadCloser,
details.ItemInfo,
*details.ExtensionInfo,
) (CorsoItemExtension, error)
// Thin wrapper for runtime logging & metrics around extensions
type loggerExtension struct {
info details.ItemInfo
innerRc io.ReadCloser
ctx context.Context
extInfo *details.ExtensionInfo
}
func NewLoggerExtension(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
extInfo *details.ExtensionInfo,
) (CorsoItemExtension, error) {
return &loggerExtension{
ctx: ctx,
innerRc: rc,
info: info,
extInfo: extInfo,
}, nil
}
func (l *loggerExtension) Read(p []byte) (int, error) {
n, err := l.innerRc.Read(p)
if err != nil && err != io.EOF {
logger.CtxErr(l.ctx, err).Error("inner read")
return n, err
}
if err == io.EOF {
logger.Ctx(l.ctx).Debug("corso extensions: EOF")
}
return n, err
}
func (l *loggerExtension) Close() error {
err := l.innerRc.Close()
if err != nil {
logger.CtxErr(l.ctx, err).Error("inner close")
return err
}
logger.Ctx(l.ctx).Info("corso extensions: closed")
return nil
}
type AddItemExtensioner interface {
AddItemExtensions(
context.Context,
io.ReadCloser,
details.ItemInfo,
[]CorsoItemExtensionFactory,
) (io.ReadCloser, *details.ExtensionInfo, error)
}
var _ AddItemExtensioner = &ItemExtensionHandler{}
type ItemExtensionHandler struct{}
// AddItemExtensions wraps provided readcloser with extensions
// supplied via factory, with the first extension in slice being
// the innermost one.
func (eh *ItemExtensionHandler) AddItemExtensions(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
factories []CorsoItemExtensionFactory,
) (io.ReadCloser, *details.ExtensionInfo, error) {
if rc == nil {
return nil, nil, clues.New("nil inner readcloser")
}
if len(factories) == 0 {
return nil, nil, clues.New("no extensions supplied")
}
factories = append(factories, NewLoggerExtension)
ctx = clues.Add(ctx, "num_extensions", len(factories))
extInfo := &details.ExtensionInfo{
Data: make(map[string]any),
}
for _, factory := range factories {
if factory == nil {
return nil, nil, clues.New("nil extension factory")
}
extRc, err := factory(ctx, rc, info, extInfo)
if err != nil {
return nil, nil, clues.Wrap(err, "calling extension factory")
}
rc = extRc
}
logger.Ctx(ctx).Info("added extensions")
return rc, extInfo, nil
}

View File

@ -0,0 +1,184 @@
package extensions
// Tests for extensions.go
import (
"bytes"
"context"
"io"
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
)
type ExtensionsUnitSuite struct {
tester.Suite
}
func TestExtensionsUnitSuite(t *testing.T) {
suite.Run(t, &ExtensionsUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
type outputValidationFunc func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool
var (
testRc = io.NopCloser(bytes.NewReader([]byte("some data")))
testItemInfo = details.ItemInfo{
OneDrive: &details.OneDriveInfo{
DriveID: "driveID",
},
}
)
table := []struct {
name string
factories []CorsoItemExtensionFactory
rc io.ReadCloser
validateOutputs outputValidationFunc
}{
{
name: "happy path",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err == nil && extRc != nil && extInfo != nil
},
},
{
name: "multiple valid factories",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
NewMockExtension,
NewMockExtension,
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err == nil && extRc != nil && extInfo != nil
},
},
{
name: "no factories supplied",
factories: nil,
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "factory slice contains nil",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
nil,
NewMockExtension,
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "factory call returns error",
factories: []CorsoItemExtensionFactory{
func(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
extInfo *details.ExtensionInfo,
) (CorsoItemExtension, error) {
return nil, clues.New("creating extension")
},
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "one or more factory calls return error",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
func(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
extInfo *details.ExtensionInfo,
) (CorsoItemExtension, error) {
return nil, clues.New("creating extension")
},
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "nil inner rc",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
},
rc: nil,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
ith := &ItemExtensionHandler{}
extRc, extInfo, err := ith.AddItemExtensions(
ctx,
test.rc,
testItemInfo,
test.factories)
require.True(t, test.validateOutputs(extRc, extInfo, err))
})
}
}
// TODO: tests for loggerExtension
// TODO: Tests to verify RC wrapper ordering by AddItemExtensioner

View File

@ -0,0 +1,81 @@
package extensions
import (
"context"
"hash/crc32"
"io"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
)
var _ CorsoItemExtension = &MockExtension{}
type MockExtension struct {
numBytes int
crc32 uint32
info details.ItemInfo
extInfo *details.ExtensionInfo
innerRc io.ReadCloser
ctx context.Context
failOnRead bool
failOnClose bool
}
func (me *MockExtension) Read(p []byte) (int, error) {
if me.failOnRead {
return 0, clues.New("mock read error")
}
n, err := me.innerRc.Read(p)
if err != nil && err != io.EOF {
logger.CtxErr(me.ctx, err).Error("inner read error")
return n, err
}
me.numBytes += n
me.crc32 = crc32.Update(me.crc32, crc32.IEEETable, p[:n])
if err == io.EOF {
logger.Ctx(me.ctx).Debug("mock extension reached EOF")
me.extInfo.Data["numBytes"] = me.numBytes
me.extInfo.Data["crc32"] = me.crc32
}
return n, err
}
func (me *MockExtension) Close() error {
if me.failOnClose {
return clues.New("mock close error")
}
err := me.innerRc.Close()
if err != nil {
return err
}
me.extInfo.Data["numBytes"] = me.numBytes
me.extInfo.Data["crc32"] = me.crc32
logger.Ctx(me.ctx).Infow(
"mock extension closed",
"numBytes", me.numBytes, "crc32", me.crc32)
return nil
}
func NewMockExtension(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
extInfo *details.ExtensionInfo,
) (CorsoItemExtension, error) {
return &MockExtension{
ctx: ctx,
innerRc: rc,
info: info,
extInfo: extInfo,
}, nil
}