From f94ae48fca6e46fad9b8b7060636e4eda518d0fb Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Fri, 30 Jun 2023 00:08:42 -0700 Subject: [PATCH] Add corso extension package --- src/cli/repo/s3.go | 4 +- src/pkg/backup/details/details.go | 7 + src/pkg/extensions/extensions.go | 67 +++++ src/pkg/extensions/extensions_test.go | 367 ++++++++++++++++++++++++++ 4 files changed, 443 insertions(+), 2 deletions(-) create mode 100644 src/pkg/extensions/extensions.go create mode 100644 src/pkg/extensions/extensions_test.go diff --git a/src/cli/repo/s3.go b/src/cli/repo/s3.go index c54dffe66..7618e08cd 100644 --- a/src/cli/repo/s3.go +++ b/src/cli/repo/s3.go @@ -57,8 +57,8 @@ func addS3Commands(cmd *cobra.Command) *cobra.Command { // More generic and more frequently used flags take precedence. fs.StringVar(&bucket, "bucket", "", "Name of S3 bucket for repo. (required)") fs.StringVar(&prefix, "prefix", "", "Repo prefix within bucket.") - fs.StringVar(&endpoint, "endpoint", "s3.amazonaws.com", "S3 service endpoint.") - fs.BoolVar(&doNotUseTLS, "disable-tls", false, "Disable TLS (HTTPS)") + fs.StringVar(&endpoint, "endpoint", "127.0.0.1:9000", "S3 service endpoint.") + fs.BoolVar(&doNotUseTLS, "disable-tls", true, "Disable TLS (HTTPS)") fs.BoolVar(&doNotVerifyTLS, "disable-tls-verification", false, "Disable TLS (HTTPS) certificate verification.") // In general, we don't want to expose this flag to users and have them mistake it diff --git a/src/pkg/backup/details/details.go b/src/pkg/backup/details/details.go index f394d02b7..6497c30c0 100644 --- a/src/pkg/backup/details/details.go +++ b/src/pkg/backup/details/details.go @@ -1044,3 +1044,10 @@ func updateFolderWithinDrive( return nil } + +// ExtensionInfo describes extension data associated with an item +// TODO: Expose this store behind an interface which can synchrnoize access to the +// underlying map. +type ExtensionInfo struct { + Data map[string]any `json:"data,omitempty"` +} diff --git a/src/pkg/extensions/extensions.go b/src/pkg/extensions/extensions.go new file mode 100644 index 000000000..f3f699a9f --- /dev/null +++ b/src/pkg/extensions/extensions.go @@ -0,0 +1,67 @@ +package extensions + +import ( + "context" + "io" + + "github.com/alcionai/clues" + + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/logger" +) + +// Extension client interface +type CorsoItemExtension interface { + io.ReadCloser +} + +type CorsoItemExtensionFactory func( + context.Context, + io.ReadCloser, + details.ItemInfo, + *details.ExtensionInfo, +) (CorsoItemExtension, error) + +// AddItemExtensions wraps provided readcloser with extensions +// supplied via factory +func AddItemExtensions( + ctx context.Context, + rc io.ReadCloser, + info details.ItemInfo, + factories []CorsoItemExtensionFactory, +) (CorsoItemExtension, *details.ExtensionInfo, error) { + // TODO: move to validate + if rc == nil { + return nil, nil, clues.New("nil inner readcloser") + } + + if len(factories) == 0 { + return nil, nil, clues.New("no extensions supplied") + } + + ctx = clues.Add(ctx, "num_extensions", len(factories)) + + extInfo := &details.ExtensionInfo{ + Data: make(map[string]any), + } + + logger.Ctx(ctx).Info("adding extensions") + + for _, factory := range factories { + if factory == nil { + return nil, nil, clues.New("nil extension factory") + } + + extRc, err := factory(ctx, rc, info, extInfo) + if err != nil { + return nil, nil, clues.Wrap(err, "creating extension") + } + + rc = extRc + } + + logger.Ctx(ctx).Info("added extensions") + + // TODO: Add an outermost extension for logging & metrics + return rc, extInfo, nil +} diff --git a/src/pkg/extensions/extensions_test.go b/src/pkg/extensions/extensions_test.go new file mode 100644 index 000000000..4600d7497 --- /dev/null +++ b/src/pkg/extensions/extensions_test.go @@ -0,0 +1,367 @@ +package extensions + +// Tests for extensions.go + +import ( + "bytes" + "context" + "hash/crc32" + "io" + "testing" + + "github.com/alcionai/clues" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/logger" +) + +// Temporary, testing purposes only +type MockExtension struct { + // TODO: Add cumlulative crc32 checksum + numBytes int + crc32 uint32 + info details.ItemInfo + extInfo *details.ExtensionInfo + innerRc io.ReadCloser + ctx context.Context + failOnRead bool + failOnClose bool +} + +func (me *MockExtension) Read(p []byte) (int, error) { + if me.failOnRead { + return 0, clues.New("mock read error") + } + + n, err := me.innerRc.Read(p) + if err != nil && err != io.EOF { + logger.CtxErr(me.ctx, err).Error("inner read error") + return n, err + } + + me.numBytes += n + me.crc32 = crc32.Update(me.crc32, crc32.IEEETable, p[:n]) + + if err == io.EOF { + logger.Ctx(me.ctx).Info("mock extension reached EOF") + me.extInfo.Data["numBytes"] = me.numBytes + me.extInfo.Data["crc32"] = me.crc32 + } + + return n, err +} + +func (me *MockExtension) Close() error { + if me.failOnClose { + return clues.New("mock close error") + } + + err := me.innerRc.Close() + if err != nil { + return err + } + + me.extInfo.Data["numBytes"] = me.numBytes + me.extInfo.Data["crc32"] = me.crc32 + logger.Ctx(me.ctx).Infow( + "mock extension closed", + "numBytes", me.numBytes, "crc32", me.crc32) + + return nil +} + +func NewMockExtension( + ctx context.Context, + rc io.ReadCloser, + info details.ItemInfo, + extInfo *details.ExtensionInfo, +) (CorsoItemExtension, error) { + return &MockExtension{ + ctx: ctx, + innerRc: rc, + info: info, + extInfo: extInfo, + }, nil +} + +type ExtensionsUnitSuite struct { + tester.Suite +} + +func TestExtensionsUnitSuite(t *testing.T) { + suite.Run(t, &ExtensionsUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +// func readFrom(rc io.ReadCloser) error { +// defer rc.Close() + +// p := make([]byte, 4) + +// for { +// _, err := rc.Read(p) +// if err == io.EOF { +// break +// } + +// if err != nil { +// return err +// } +// } + +// return nil +// } + +func (suite *ExtensionsUnitSuite) TestAddItemExtensions() { + type outputValidationFunc func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool + + var ( + testRc = io.NopCloser(bytes.NewReader([]byte("some data"))) + testItemInfo = details.ItemInfo{ + OneDrive: &details.OneDriveInfo{ + DriveID: "driveID", + }, + } + ) + + table := []struct { + name string + factories []CorsoItemExtensionFactory + rc io.ReadCloser + validateOutputs outputValidationFunc + }{ + { + name: "happy path", + factories: []CorsoItemExtensionFactory{ + func( + ctx context.Context, + rc io.ReadCloser, + info details.ItemInfo, + extInfo *details.ExtensionInfo, + ) (CorsoItemExtension, error) { + return NewMockExtension(ctx, rc, info, extInfo) + }, + }, + rc: testRc, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err == nil && extRc != nil && extInfo != nil + }, + }, + { + name: "multiple valid factories", + factories: []CorsoItemExtensionFactory{ + NewMockExtension, + NewMockExtension, + NewMockExtension, + }, + rc: testRc, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err == nil && extRc != nil && extInfo != nil + }, + }, + { + name: "no factories supplied", + factories: nil, + rc: testRc, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err != nil && extRc == nil && extInfo == nil + }, + }, + { + name: "factory slice contains nil", + factories: []CorsoItemExtensionFactory{ + NewMockExtension, + nil, + NewMockExtension, + }, + rc: testRc, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err != nil && extRc == nil && extInfo == nil + }, + }, + { + name: "factory call returns error", + factories: []CorsoItemExtensionFactory{ + func( + ctx context.Context, + rc io.ReadCloser, + info details.ItemInfo, + extInfo *details.ExtensionInfo, + ) (CorsoItemExtension, error) { + return nil, clues.New("creating extension") + }, + }, + rc: testRc, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err != nil && extRc == nil && extInfo == nil + }, + }, + { + name: "one or more factory calls return error", + factories: []CorsoItemExtensionFactory{ + NewMockExtension, + func( + ctx context.Context, + rc io.ReadCloser, + info details.ItemInfo, + extInfo *details.ExtensionInfo, + ) (CorsoItemExtension, error) { + return nil, clues.New("creating extension") + }, + }, + rc: testRc, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err != nil && extRc == nil && extInfo == nil + }, + }, + { + name: "nil inner rc", + factories: []CorsoItemExtensionFactory{ + NewMockExtension, + }, + rc: nil, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err != nil && extRc == nil && extInfo == nil + }, + }, + { + name: "nil inner rc", + factories: []CorsoItemExtensionFactory{ + NewMockExtension, + }, + rc: nil, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err != nil && extRc == nil && extInfo == nil + }, + }, + { + name: "nil info", + factories: []CorsoItemExtensionFactory{ + NewMockExtension, + }, + rc: nil, + validateOutputs: func( + extRc io.ReadCloser, + extInfo *details.ExtensionInfo, + err error, + ) bool { + return err != nil && extRc == nil && extInfo == nil + }, + }, + // verify wrap ordering + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + ctx, flush := tester.NewContext(t) + defer flush() + + extRc, extInfo, err := AddItemExtensions( + ctx, + test.rc, + testItemInfo, + test.factories) + require.True(t, test.validateOutputs(extRc, extInfo, err)) + }) + } +} + +// { +// name: "read_failure", +// factories: []CorsoItemExtensionFactory{ +// func( +// ctx context.Context, +// rc io.ReadCloser, +// info details.ItemInfo, +// extInfo *details.ExtensionInfo, +// ) (CorsoItemExtension, error) { +// mockExt, _ := NewMockExtension(ctx, rc, info, extInfo) +// mockExt.failOnRead = true +// return mockExt, nil +// }, +// }, +// payload: []byte("some data"), +// expectedErr: require.Error, +// rc: io.NopCloser(bytes.NewReader([]byte("some data"))), +// }, +// { +// name: "close_failure", +// factories: []CorsoItemExtensionFactory{ +// func( +// ctx context.Context, +// rc io.ReadCloser, +// info details.ItemInfo, +// extInfo *details.ExtensionInfo, +// ) (CorsoItemExtension, error) { +// mockExt, _ := NewMockExtension(ctx, rc, info, extInfo) +// mockExt.failOnClose = true +// return mockExt, nil +// }, +// }, +// payload: []byte("some data"), +// expectedErr: require.Error, +// rc: io.NopCloser(bytes.NewReader([]byte("some data"))), +// }, + +// for _, test := range table { +// suite.Run(test.name, func() { +// t := suite.T() +// ctx, flush := tester.NewContext(t) +// defer flush() + +// extRc, extInfo, err := AddItemExtensions( +// ctx, +// test.rc, +// details.ItemInfo{}, +// test.factories) +// require.NoError(suite.T(), err) + +// err = readFrom(extRc) +// require.NoError(suite.T(), err) + +// require.Equal(suite.T(), len(test.payload), extInfo.Data["numBytes"]) + +// // verify crc32 +// c := extInfo.Data["crc32"].(uint32) +// require.Equal(suite.T(), c, crc32.ChecksumIEEE(test.payload)) +// }) +// }