Create exported readers for injecting and retrieving serialization version info (#4378)

Previously kopia wrapper has been transparently injecting
and stripping out a "serialization version" for all items
persisted in kopia. The version thus far has been
hard-coded to `1` and has been stored in big endian format
in the first 4 bytes of every kopia file

This PR is a step towards getting serialization versions
on a per-service/per-item basis. It exposes serialization
readers that inject and strip out info to other packages

This PR also slightly changes the serialization version
format. The changes are two-fold:
* the MSB is now used to represent if the item was
  deleted between the time it was discovered and when
  its data was requested
* the serialization version number is set to uint16. This
  has no impact on existing persisted data since all
  versions are currently `1` (don't use more than
  16-bits). This size can be expanded in the future if
  needed, but is mostly to enforce the idea that higher
  order bits shouldn't be used for version numbers right
  now

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #4328

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-09-29 08:52:05 -07:00 committed by GitHub
parent 375019a988
commit 05060e7d1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 549 additions and 0 deletions

View File

@ -0,0 +1,187 @@
package readers
import (
"bytes"
"encoding/binary"
"io"
"os"
"unsafe"
"github.com/alcionai/clues"
)
// persistedSerializationVersion is the size of the serialization version in
// storage.
//
// The current on-disk format of this field is written in big endian. The
// highest bit denotes if the item is empty because it was deleted between the
// time we told the storage about it and when we needed to get data for it. The
// lowest two bytes are the version number. All other bits are reserved for
// future use.
//
// MSB 31 30 16 8 0 LSB
// +----------+----+---------+--------+-------+
// | del flag | reserved | version number |
// +----------+----+---------+--------+-------+
type persistedSerializationVersion = uint32
// SerializationVersion is the in-memory size of the version number that gets
// added to the persisted serialization version.
//
// Right now it's only a uint16 but we can expand it to be larger so long as the
// expanded size doesn't clash with the flags in the high-order bits.
type SerializationVersion uint16
// DefaultSerializationVersion is the current (default) version number for all
// services. As services evolve their storage format they should begin tracking
// their own version numbers separate from other services.
const DefaultSerializationVersion SerializationVersion = 1
const (
versionFormatSize = int(unsafe.Sizeof(persistedSerializationVersion(0)))
delInFlightMask persistedSerializationVersion = 1 << ((versionFormatSize * 8) - 1)
)
// SerializationFormat is a struct describing serialization format versions and
// flags to add for this item.
type SerializationFormat struct {
Version SerializationVersion
DelInFlight bool
}
// NewVersionedBackupReader creates a reader that injects format into the first
// bytes of the returned data. After format has been returned, data is returned
// from baseReaders in the order they're passed in.
func NewVersionedBackupReader(
format SerializationFormat,
baseReaders ...io.ReadCloser,
) (io.ReadCloser, error) {
if format.DelInFlight && len(baseReaders) > 0 {
// This is a conservative check, but we can always loosen it later on if
// needed. At the moment we really don't expect any data if the item was
// deleted.
return nil, clues.New("item marked deleted but has reader(s)")
}
formattedVersion := persistedSerializationVersion(format.Version)
if format.DelInFlight {
formattedVersion |= delInFlightMask
}
formattedBuf := make([]byte, versionFormatSize)
binary.BigEndian.PutUint32(formattedBuf, formattedVersion)
versionReader := io.NopCloser(bytes.NewReader(formattedBuf))
// Need to add readers individually because types differ.
allReaders := make([]io.Reader, 0, len(baseReaders)+1)
allReaders = append(allReaders, versionReader)
for _, r := range baseReaders {
allReaders = append(allReaders, r)
}
res := &versionedBackupReader{
baseReaders: append([]io.ReadCloser{versionReader}, baseReaders...),
combined: io.MultiReader(allReaders...),
}
return res, nil
}
type versionedBackupReader struct {
// baseReaders is a reference to the original readers so we can close them.
baseReaders []io.ReadCloser
// combined is the reader that will return all data.
combined io.Reader
}
func (vbr *versionedBackupReader) Read(p []byte) (int, error) {
if vbr.combined == nil {
return 0, os.ErrClosed
}
n, err := vbr.combined.Read(p)
if err == io.EOF {
// Golang doesn't allow wrapping of EOF. If we wrap it other things start
// thinking it's an actual error.
return n, err
}
return n, clues.Stack(err).OrNil()
}
func (vbr *versionedBackupReader) Close() error {
if vbr.combined == nil {
return nil
}
vbr.combined = nil
var errs *clues.Err
for i, r := range vbr.baseReaders {
if err := r.Close(); err != nil {
errs = clues.Stack(
errs,
clues.Wrap(err, "closing reader").With("reader_index", i))
}
}
vbr.baseReaders = nil
return errs.OrNil()
}
// NewVersionedRestoreReader wraps baseReader and provides easy access to the
// SerializationFormat info in the first bytes of the data contained in
// baseReader.
func NewVersionedRestoreReader(
baseReader io.ReadCloser,
) (*VersionedRestoreReader, error) {
versionBuf := make([]byte, versionFormatSize)
// Loop to account for the unlikely case where we get a short read.
for read := 0; read < versionFormatSize; {
n, err := baseReader.Read(versionBuf[read:])
if err != nil {
return nil, clues.Wrap(err, "reading serialization version")
}
read += n
}
formattedVersion := binary.BigEndian.Uint32(versionBuf)
return &VersionedRestoreReader{
baseReader: baseReader,
format: SerializationFormat{
Version: SerializationVersion(formattedVersion),
DelInFlight: (formattedVersion & delInFlightMask) != 0,
},
}, nil
}
type VersionedRestoreReader struct {
baseReader io.ReadCloser
format SerializationFormat
}
func (vrr *VersionedRestoreReader) Read(p []byte) (int, error) {
n, err := vrr.baseReader.Read(p)
if err == io.EOF {
// Golang doesn't allow wrapping of EOF. If we wrap it other things start
// thinking it's an actual error.
return n, err
}
return n, clues.Stack(err).OrNil()
}
func (vrr *VersionedRestoreReader) Close() error {
return clues.Stack(vrr.baseReader.Close()).OrNil()
}
func (vrr VersionedRestoreReader) Format() SerializationFormat {
return vrr.format
}

View File

@ -0,0 +1,362 @@
package readers_test
import (
"bytes"
"io"
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/tester"
)
type shortReader struct {
maxReadLen int
io.ReadCloser
}
func (s *shortReader) Read(p []byte) (int, error) {
toRead := s.maxReadLen
if len(p) < toRead {
toRead = len(p)
}
return s.ReadCloser.Read(p[:toRead])
}
type SerializationReaderUnitSuite struct {
tester.Suite
}
func TestSerializationReaderUnitSuite(t *testing.T) {
suite.Run(t, &SerializationReaderUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *SerializationReaderUnitSuite) TestBackupSerializationReader() {
baseData := []byte("hello world")
table := []struct {
name string
format readers.SerializationFormat
inputReaders []io.ReadCloser
expectErr require.ErrorAssertionFunc
expectData []byte
}{
{
name: "DeletedInFlight NoVersion NoReaders",
format: readers.SerializationFormat{
DelInFlight: true,
},
expectErr: require.NoError,
expectData: []byte{0x80, 0x0, 0x0, 0x0},
},
{
name: "DeletedInFlight NoReaders",
format: readers.SerializationFormat{
Version: 42,
DelInFlight: true,
},
expectErr: require.NoError,
expectData: []byte{0x80, 0x0, 0x0, 42},
},
{
name: "NoVersion NoReaders",
expectErr: require.NoError,
expectData: []byte{0x00, 0x0, 0x0, 0x0},
},
{
name: "NoReaders",
format: readers.SerializationFormat{
Version: 42,
},
expectErr: require.NoError,
expectData: []byte{0x00, 0x0, 0x0, 42},
},
{
name: "SingleReader",
format: readers.SerializationFormat{
Version: 42,
},
inputReaders: []io.ReadCloser{io.NopCloser(bytes.NewReader(baseData))},
expectErr: require.NoError,
expectData: append([]byte{0x00, 0x0, 0x0, 42}, baseData...),
},
{
name: "MultipleReaders",
format: readers.SerializationFormat{
Version: 42,
},
inputReaders: []io.ReadCloser{
io.NopCloser(bytes.NewReader(baseData)),
io.NopCloser(bytes.NewReader(baseData)),
},
expectErr: require.NoError,
expectData: append(
append([]byte{0x00, 0x0, 0x0, 42}, baseData...),
baseData...),
},
// Uncomment if we expand the version to 32 bits.
//{
// name: "VersionWithHighBitSet NoReaders Errors",
// format: readers.SerializationFormat{
// Version: 0x80000000,
// },
// expectErr: require.Error,
//},
{
name: "DeletedInFlight SingleReader Errors",
format: readers.SerializationFormat{
DelInFlight: true,
},
inputReaders: []io.ReadCloser{io.NopCloser(bytes.NewReader(baseData))},
expectErr: require.Error,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
r, err := readers.NewVersionedBackupReader(
test.format,
test.inputReaders...)
test.expectErr(t, err, "getting backup reader: %v", clues.ToCore(err))
if err != nil {
return
}
defer func() {
err := r.Close()
assert.NoError(t, err, "closing reader: %v", clues.ToCore(err))
}()
buf, err := io.ReadAll(r)
require.NoError(
t,
err,
"reading serialized data: %v",
clues.ToCore(err))
// Need to use equal because output is order-sensitive.
assert.Equal(t, test.expectData, buf, "serialized data")
})
}
}
func (suite *SerializationReaderUnitSuite) TestBackupSerializationReader_ShortReads() {
t := suite.T()
baseData := []byte("hello world")
expectData := append(
append([]byte{0x00, 0x0, 0x0, 42}, baseData...),
baseData...)
r, err := readers.NewVersionedBackupReader(
readers.SerializationFormat{Version: 42},
io.NopCloser(bytes.NewReader(baseData)),
io.NopCloser(bytes.NewReader(baseData)))
require.NoError(t, err, "getting backup reader: %v", clues.ToCore(err))
defer func() {
err := r.Close()
assert.NoError(t, err, "closing reader: %v", clues.ToCore(err))
}()
buf := make([]byte, len(expectData))
r = &shortReader{
maxReadLen: 3,
ReadCloser: r,
}
for read := 0; ; {
n, err := r.Read(buf[read:])
read += n
if read >= len(buf) {
break
}
require.NoError(t, err, "reading data: %v", clues.ToCore(err))
}
// Need to use equal because output is order-sensitive.
assert.Equal(t, expectData, buf, "serialized data")
}
// TestRestoreSerializationReader checks that we can read previously serialized
// data. For simplicity, it uses the versionedBackupReader to generate the
// input. This should be relatively safe because the tests for
// versionedBackupReader do compare directly against serialized data.
func (suite *SerializationReaderUnitSuite) TestRestoreSerializationReader() {
baseData := []byte("hello world")
table := []struct {
name string
inputReader func(*testing.T) io.ReadCloser
expectErr require.ErrorAssertionFunc
expectVersion readers.SerializationVersion
expectDelInFlight bool
expectData []byte
}{
{
name: "NoVersion NoReaders",
inputReader: func(t *testing.T) io.ReadCloser {
r, err := readers.NewVersionedBackupReader(readers.SerializationFormat{})
require.NoError(t, err, "making reader: %v", clues.ToCore(err))
return r
},
expectErr: require.NoError,
expectData: []byte{},
},
{
name: "DeletedInFlight NoReaders",
inputReader: func(t *testing.T) io.ReadCloser {
r, err := readers.NewVersionedBackupReader(
readers.SerializationFormat{
Version: 42,
DelInFlight: true,
})
require.NoError(t, err, "making reader: %v", clues.ToCore(err))
return r
},
expectErr: require.NoError,
expectVersion: 42,
expectDelInFlight: true,
expectData: []byte{},
},
{
name: "DeletedInFlight SingleReader",
inputReader: func(t *testing.T) io.ReadCloser {
// Need to specify the bytes manually because the backup reader won't
// allow creating something with the deleted flag and data.
return io.NopCloser(bytes.NewReader(append(
[]byte{0x80, 0x0, 0x0, 42},
baseData...)))
},
expectErr: require.NoError,
expectVersion: 42,
expectDelInFlight: true,
expectData: baseData,
},
{
name: "NoVersion SingleReader",
inputReader: func(t *testing.T) io.ReadCloser {
r, err := readers.NewVersionedBackupReader(
readers.SerializationFormat{},
io.NopCloser(bytes.NewReader(baseData)))
require.NoError(t, err, "making reader: %v", clues.ToCore(err))
return r
},
expectErr: require.NoError,
expectData: baseData,
},
{
name: "SingleReader",
inputReader: func(t *testing.T) io.ReadCloser {
r, err := readers.NewVersionedBackupReader(
readers.SerializationFormat{Version: 42},
io.NopCloser(bytes.NewReader(baseData)))
require.NoError(t, err, "making reader: %v", clues.ToCore(err))
return r
},
expectErr: require.NoError,
expectVersion: 42,
expectData: baseData,
},
{
name: "ShortReads SingleReader",
inputReader: func(t *testing.T) io.ReadCloser {
r, err := readers.NewVersionedBackupReader(
readers.SerializationFormat{Version: 42},
io.NopCloser(bytes.NewReader(baseData)))
require.NoError(t, err, "making reader: %v", clues.ToCore(err))
r = &shortReader{
maxReadLen: 3,
ReadCloser: r,
}
return r
},
expectErr: require.NoError,
expectVersion: 42,
expectData: baseData,
},
{
name: "MultipleReaders",
inputReader: func(t *testing.T) io.ReadCloser {
r, err := readers.NewVersionedBackupReader(
readers.SerializationFormat{Version: 42},
io.NopCloser(bytes.NewReader(baseData)),
io.NopCloser(bytes.NewReader(baseData)))
require.NoError(t, err, "making reader: %v", clues.ToCore(err))
return r
},
expectErr: require.NoError,
expectVersion: 42,
expectData: append(slices.Clone(baseData), baseData...),
},
{
name: "EmptyReader Errors",
inputReader: func(t *testing.T) io.ReadCloser {
return io.NopCloser(bytes.NewReader([]byte{}))
},
expectErr: require.Error,
},
{
name: "TruncatedVersion Errors",
inputReader: func(t *testing.T) io.ReadCloser {
return io.NopCloser(bytes.NewReader([]byte{0x80, 0x0}))
},
expectErr: require.Error,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
r, err := readers.NewVersionedRestoreReader(test.inputReader(t))
test.expectErr(t, err, "getting restore reader: %v", clues.ToCore(err))
if err != nil {
return
}
defer func() {
err := r.Close()
assert.NoError(t, err, "closing reader: %v", clues.ToCore(err))
}()
assert.Equal(
t,
test.expectVersion,
r.Format().Version,
"version")
assert.Equal(
t,
test.expectDelInFlight,
r.Format().DelInFlight,
"deleted in flight")
buf, err := io.ReadAll(r)
require.NoError(t, err, "reading serialized data: %v", clues.ToCore(err))
// Need to use equal because output is order-sensitive.
assert.Equal(t, test.expectData, buf, "serialized data")
})
}
}