diff --git a/src/internal/connector/exchange_data_collection.go b/src/internal/connector/exchange_data_collection.go index 2809dec76..df5ee573a 100644 --- a/src/internal/connector/exchange_data_collection.go +++ b/src/internal/connector/exchange_data_collection.go @@ -28,7 +28,7 @@ type DataCollection interface { // that can be consumed as a stream (it embeds io.Reader) type DataStream interface { // Returns an io.Reader for the DataStream - ToReader() io.Reader + ToReader() io.ReadCloser // Provides a unique identifier for this data UUID() string } @@ -103,6 +103,6 @@ func (ed *ExchangeData) UUID() string { return ed.id } -func (ed *ExchangeData) ToReader() io.Reader { - return bytes.NewReader(ed.message) +func (ed *ExchangeData) ToReader() io.ReadCloser { + return io.NopCloser(bytes.NewReader(ed.message)) } diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index 3243f2f57..780389627 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -43,7 +43,7 @@ func (medc *MockExchangeDataCollection) NextItem() (connector.DataStream, error) medc.messagesRead++ // We can plug in whatever data we want here (can be an io.Reader to a test data file if needed) m := []byte("test message") - return &MockExchangeData{uuid.NewString(), bytes.NewReader(m)}, nil + return &MockExchangeData{uuid.NewString(), io.NopCloser(bytes.NewReader(m))}, nil } return nil, io.EOF } @@ -51,13 +51,13 @@ func (medc *MockExchangeDataCollection) NextItem() (connector.DataStream, error) // ExchangeData represents a single item retrieved from exchange type MockExchangeData struct { id string - reader io.Reader + reader io.ReadCloser } func (med *MockExchangeData) UUID() string { return med.id } -func (med *MockExchangeData) ToReader() io.Reader { +func (med *MockExchangeData) ToReader() io.ReadCloser { return med.reader }