Add corso extension package

This commit is contained in:
Abhishek Pandey 2023-06-30 00:08:42 -07:00
parent 43d4a4d5e8
commit f94ae48fca
4 changed files with 443 additions and 2 deletions

View File

@ -57,8 +57,8 @@ func addS3Commands(cmd *cobra.Command) *cobra.Command {
// More generic and more frequently used flags take precedence.
fs.StringVar(&bucket, "bucket", "", "Name of S3 bucket for repo. (required)")
fs.StringVar(&prefix, "prefix", "", "Repo prefix within bucket.")
fs.StringVar(&endpoint, "endpoint", "s3.amazonaws.com", "S3 service endpoint.")
fs.BoolVar(&doNotUseTLS, "disable-tls", false, "Disable TLS (HTTPS)")
fs.StringVar(&endpoint, "endpoint", "127.0.0.1:9000", "S3 service endpoint.")
fs.BoolVar(&doNotUseTLS, "disable-tls", true, "Disable TLS (HTTPS)")
fs.BoolVar(&doNotVerifyTLS, "disable-tls-verification", false, "Disable TLS (HTTPS) certificate verification.")
// In general, we don't want to expose this flag to users and have them mistake it

View File

@ -1044,3 +1044,10 @@ func updateFolderWithinDrive(
return nil
}
// ExtensionInfo describes extension data associated with an item
// TODO: Expose this store behind an interface which can synchrnoize access to the
// underlying map.
type ExtensionInfo struct {
Data map[string]any `json:"data,omitempty"`
}

View File

@ -0,0 +1,67 @@
package extensions
import (
"context"
"io"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
)
// Extension client interface
type CorsoItemExtension interface {
io.ReadCloser
}
type CorsoItemExtensionFactory func(
context.Context,
io.ReadCloser,
details.ItemInfo,
*details.ExtensionInfo,
) (CorsoItemExtension, error)
// AddItemExtensions wraps provided readcloser with extensions
// supplied via factory
func AddItemExtensions(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
factories []CorsoItemExtensionFactory,
) (CorsoItemExtension, *details.ExtensionInfo, error) {
// TODO: move to validate
if rc == nil {
return nil, nil, clues.New("nil inner readcloser")
}
if len(factories) == 0 {
return nil, nil, clues.New("no extensions supplied")
}
ctx = clues.Add(ctx, "num_extensions", len(factories))
extInfo := &details.ExtensionInfo{
Data: make(map[string]any),
}
logger.Ctx(ctx).Info("adding extensions")
for _, factory := range factories {
if factory == nil {
return nil, nil, clues.New("nil extension factory")
}
extRc, err := factory(ctx, rc, info, extInfo)
if err != nil {
return nil, nil, clues.Wrap(err, "creating extension")
}
rc = extRc
}
logger.Ctx(ctx).Info("added extensions")
// TODO: Add an outermost extension for logging & metrics
return rc, extInfo, nil
}

View File

@ -0,0 +1,367 @@
package extensions
// Tests for extensions.go
import (
"bytes"
"context"
"hash/crc32"
"io"
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
)
// Temporary, testing purposes only
type MockExtension struct {
// TODO: Add cumlulative crc32 checksum
numBytes int
crc32 uint32
info details.ItemInfo
extInfo *details.ExtensionInfo
innerRc io.ReadCloser
ctx context.Context
failOnRead bool
failOnClose bool
}
func (me *MockExtension) Read(p []byte) (int, error) {
if me.failOnRead {
return 0, clues.New("mock read error")
}
n, err := me.innerRc.Read(p)
if err != nil && err != io.EOF {
logger.CtxErr(me.ctx, err).Error("inner read error")
return n, err
}
me.numBytes += n
me.crc32 = crc32.Update(me.crc32, crc32.IEEETable, p[:n])
if err == io.EOF {
logger.Ctx(me.ctx).Info("mock extension reached EOF")
me.extInfo.Data["numBytes"] = me.numBytes
me.extInfo.Data["crc32"] = me.crc32
}
return n, err
}
func (me *MockExtension) Close() error {
if me.failOnClose {
return clues.New("mock close error")
}
err := me.innerRc.Close()
if err != nil {
return err
}
me.extInfo.Data["numBytes"] = me.numBytes
me.extInfo.Data["crc32"] = me.crc32
logger.Ctx(me.ctx).Infow(
"mock extension closed",
"numBytes", me.numBytes, "crc32", me.crc32)
return nil
}
func NewMockExtension(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
extInfo *details.ExtensionInfo,
) (CorsoItemExtension, error) {
return &MockExtension{
ctx: ctx,
innerRc: rc,
info: info,
extInfo: extInfo,
}, nil
}
type ExtensionsUnitSuite struct {
tester.Suite
}
func TestExtensionsUnitSuite(t *testing.T) {
suite.Run(t, &ExtensionsUnitSuite{Suite: tester.NewUnitSuite(t)})
}
// func readFrom(rc io.ReadCloser) error {
// defer rc.Close()
// p := make([]byte, 4)
// for {
// _, err := rc.Read(p)
// if err == io.EOF {
// break
// }
// if err != nil {
// return err
// }
// }
// return nil
// }
func (suite *ExtensionsUnitSuite) TestAddItemExtensions() {
type outputValidationFunc func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool
var (
testRc = io.NopCloser(bytes.NewReader([]byte("some data")))
testItemInfo = details.ItemInfo{
OneDrive: &details.OneDriveInfo{
DriveID: "driveID",
},
}
)
table := []struct {
name string
factories []CorsoItemExtensionFactory
rc io.ReadCloser
validateOutputs outputValidationFunc
}{
{
name: "happy path",
factories: []CorsoItemExtensionFactory{
func(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
extInfo *details.ExtensionInfo,
) (CorsoItemExtension, error) {
return NewMockExtension(ctx, rc, info, extInfo)
},
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err == nil && extRc != nil && extInfo != nil
},
},
{
name: "multiple valid factories",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
NewMockExtension,
NewMockExtension,
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err == nil && extRc != nil && extInfo != nil
},
},
{
name: "no factories supplied",
factories: nil,
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "factory slice contains nil",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
nil,
NewMockExtension,
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "factory call returns error",
factories: []CorsoItemExtensionFactory{
func(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
extInfo *details.ExtensionInfo,
) (CorsoItemExtension, error) {
return nil, clues.New("creating extension")
},
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "one or more factory calls return error",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
func(
ctx context.Context,
rc io.ReadCloser,
info details.ItemInfo,
extInfo *details.ExtensionInfo,
) (CorsoItemExtension, error) {
return nil, clues.New("creating extension")
},
},
rc: testRc,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "nil inner rc",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
},
rc: nil,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "nil inner rc",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
},
rc: nil,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
{
name: "nil info",
factories: []CorsoItemExtensionFactory{
NewMockExtension,
},
rc: nil,
validateOutputs: func(
extRc io.ReadCloser,
extInfo *details.ExtensionInfo,
err error,
) bool {
return err != nil && extRc == nil && extInfo == nil
},
},
// verify wrap ordering
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
extRc, extInfo, err := AddItemExtensions(
ctx,
test.rc,
testItemInfo,
test.factories)
require.True(t, test.validateOutputs(extRc, extInfo, err))
})
}
}
// {
// name: "read_failure",
// factories: []CorsoItemExtensionFactory{
// func(
// ctx context.Context,
// rc io.ReadCloser,
// info details.ItemInfo,
// extInfo *details.ExtensionInfo,
// ) (CorsoItemExtension, error) {
// mockExt, _ := NewMockExtension(ctx, rc, info, extInfo)
// mockExt.failOnRead = true
// return mockExt, nil
// },
// },
// payload: []byte("some data"),
// expectedErr: require.Error,
// rc: io.NopCloser(bytes.NewReader([]byte("some data"))),
// },
// {
// name: "close_failure",
// factories: []CorsoItemExtensionFactory{
// func(
// ctx context.Context,
// rc io.ReadCloser,
// info details.ItemInfo,
// extInfo *details.ExtensionInfo,
// ) (CorsoItemExtension, error) {
// mockExt, _ := NewMockExtension(ctx, rc, info, extInfo)
// mockExt.failOnClose = true
// return mockExt, nil
// },
// },
// payload: []byte("some data"),
// expectedErr: require.Error,
// rc: io.NopCloser(bytes.NewReader([]byte("some data"))),
// },
// for _, test := range table {
// suite.Run(test.name, func() {
// t := suite.T()
// ctx, flush := tester.NewContext(t)
// defer flush()
// extRc, extInfo, err := AddItemExtensions(
// ctx,
// test.rc,
// details.ItemInfo{},
// test.factories)
// require.NoError(suite.T(), err)
// err = readFrom(extRc)
// require.NoError(suite.T(), err)
// require.Equal(suite.T(), len(test.payload), extInfo.Data["numBytes"])
// // verify crc32
// c := extInfo.Data["crc32"].(uint32)
// require.Equal(suite.T(), c, crc32.ChecksumIEEE(test.payload))
// })
// }