From 05060e7d1f91360f67f83595b421b34324957300 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Fri, 29 Sep 2023 08:52:05 -0700 Subject: [PATCH] Create exported readers for injecting and retrieving serialization version info (#4378) Previously kopia wrapper has been transparently injecting and stripping out a "serialization version" for all items persisted in kopia. The version thus far has been hard-coded to `1` and has been stored in big endian format in the first 4 bytes of every kopia file This PR is a step towards getting serialization versions on a per-service/per-item basis. It exposes serialization readers that inject and strip out info to other packages This PR also slightly changes the serialization version format. The changes are two-fold: * the MSB is now used to represent if the item was deleted between the time it was discovered and when its data was requested * the serialization version number is set to uint16. This has no impact on existing persisted data since all versions are currently `1` (don't use more than 16-bits). This size can be expanded in the future if needed, but is mostly to enforce the idea that higher order bits shouldn't be used for version numbers right now --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * #4328 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- .../common/readers/serialization_version.go | 187 +++++++++ .../readers/serialization_version_test.go | 362 ++++++++++++++++++ 2 files changed, 549 insertions(+) create mode 100644 src/internal/common/readers/serialization_version.go create mode 100644 src/internal/common/readers/serialization_version_test.go diff --git a/src/internal/common/readers/serialization_version.go b/src/internal/common/readers/serialization_version.go new file mode 100644 index 000000000..a6713f959 --- /dev/null +++ b/src/internal/common/readers/serialization_version.go @@ -0,0 +1,187 @@ +package readers + +import ( + "bytes" + "encoding/binary" + "io" + "os" + "unsafe" + + "github.com/alcionai/clues" +) + +// persistedSerializationVersion is the size of the serialization version in +// storage. +// +// The current on-disk format of this field is written in big endian. The +// highest bit denotes if the item is empty because it was deleted between the +// time we told the storage about it and when we needed to get data for it. The +// lowest two bytes are the version number. All other bits are reserved for +// future use. +// +// MSB 31 30 16 8 0 LSB +// +----------+----+---------+--------+-------+ +// | del flag | reserved | version number | +// +----------+----+---------+--------+-------+ +type persistedSerializationVersion = uint32 + +// SerializationVersion is the in-memory size of the version number that gets +// added to the persisted serialization version. +// +// Right now it's only a uint16 but we can expand it to be larger so long as the +// expanded size doesn't clash with the flags in the high-order bits. +type SerializationVersion uint16 + +// DefaultSerializationVersion is the current (default) version number for all +// services. As services evolve their storage format they should begin tracking +// their own version numbers separate from other services. +const DefaultSerializationVersion SerializationVersion = 1 + +const ( + versionFormatSize = int(unsafe.Sizeof(persistedSerializationVersion(0))) + delInFlightMask persistedSerializationVersion = 1 << ((versionFormatSize * 8) - 1) +) + +// SerializationFormat is a struct describing serialization format versions and +// flags to add for this item. +type SerializationFormat struct { + Version SerializationVersion + DelInFlight bool +} + +// NewVersionedBackupReader creates a reader that injects format into the first +// bytes of the returned data. After format has been returned, data is returned +// from baseReaders in the order they're passed in. +func NewVersionedBackupReader( + format SerializationFormat, + baseReaders ...io.ReadCloser, +) (io.ReadCloser, error) { + if format.DelInFlight && len(baseReaders) > 0 { + // This is a conservative check, but we can always loosen it later on if + // needed. At the moment we really don't expect any data if the item was + // deleted. + return nil, clues.New("item marked deleted but has reader(s)") + } + + formattedVersion := persistedSerializationVersion(format.Version) + if format.DelInFlight { + formattedVersion |= delInFlightMask + } + + formattedBuf := make([]byte, versionFormatSize) + binary.BigEndian.PutUint32(formattedBuf, formattedVersion) + + versionReader := io.NopCloser(bytes.NewReader(formattedBuf)) + + // Need to add readers individually because types differ. + allReaders := make([]io.Reader, 0, len(baseReaders)+1) + allReaders = append(allReaders, versionReader) + + for _, r := range baseReaders { + allReaders = append(allReaders, r) + } + + res := &versionedBackupReader{ + baseReaders: append([]io.ReadCloser{versionReader}, baseReaders...), + combined: io.MultiReader(allReaders...), + } + + return res, nil +} + +type versionedBackupReader struct { + // baseReaders is a reference to the original readers so we can close them. + baseReaders []io.ReadCloser + // combined is the reader that will return all data. + combined io.Reader +} + +func (vbr *versionedBackupReader) Read(p []byte) (int, error) { + if vbr.combined == nil { + return 0, os.ErrClosed + } + + n, err := vbr.combined.Read(p) + if err == io.EOF { + // Golang doesn't allow wrapping of EOF. If we wrap it other things start + // thinking it's an actual error. + return n, err + } + + return n, clues.Stack(err).OrNil() +} + +func (vbr *versionedBackupReader) Close() error { + if vbr.combined == nil { + return nil + } + + vbr.combined = nil + + var errs *clues.Err + + for i, r := range vbr.baseReaders { + if err := r.Close(); err != nil { + errs = clues.Stack( + errs, + clues.Wrap(err, "closing reader").With("reader_index", i)) + } + } + + vbr.baseReaders = nil + + return errs.OrNil() +} + +// NewVersionedRestoreReader wraps baseReader and provides easy access to the +// SerializationFormat info in the first bytes of the data contained in +// baseReader. +func NewVersionedRestoreReader( + baseReader io.ReadCloser, +) (*VersionedRestoreReader, error) { + versionBuf := make([]byte, versionFormatSize) + + // Loop to account for the unlikely case where we get a short read. + for read := 0; read < versionFormatSize; { + n, err := baseReader.Read(versionBuf[read:]) + if err != nil { + return nil, clues.Wrap(err, "reading serialization version") + } + + read += n + } + + formattedVersion := binary.BigEndian.Uint32(versionBuf) + + return &VersionedRestoreReader{ + baseReader: baseReader, + format: SerializationFormat{ + Version: SerializationVersion(formattedVersion), + DelInFlight: (formattedVersion & delInFlightMask) != 0, + }, + }, nil +} + +type VersionedRestoreReader struct { + baseReader io.ReadCloser + format SerializationFormat +} + +func (vrr *VersionedRestoreReader) Read(p []byte) (int, error) { + n, err := vrr.baseReader.Read(p) + if err == io.EOF { + // Golang doesn't allow wrapping of EOF. If we wrap it other things start + // thinking it's an actual error. + return n, err + } + + return n, clues.Stack(err).OrNil() +} + +func (vrr *VersionedRestoreReader) Close() error { + return clues.Stack(vrr.baseReader.Close()).OrNil() +} + +func (vrr VersionedRestoreReader) Format() SerializationFormat { + return vrr.format +} diff --git a/src/internal/common/readers/serialization_version_test.go b/src/internal/common/readers/serialization_version_test.go new file mode 100644 index 000000000..7d99c7721 --- /dev/null +++ b/src/internal/common/readers/serialization_version_test.go @@ -0,0 +1,362 @@ +package readers_test + +import ( + "bytes" + "io" + "testing" + + "github.com/alcionai/clues" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "golang.org/x/exp/slices" + + "github.com/alcionai/corso/src/internal/common/readers" + "github.com/alcionai/corso/src/internal/tester" +) + +type shortReader struct { + maxReadLen int + io.ReadCloser +} + +func (s *shortReader) Read(p []byte) (int, error) { + toRead := s.maxReadLen + if len(p) < toRead { + toRead = len(p) + } + + return s.ReadCloser.Read(p[:toRead]) +} + +type SerializationReaderUnitSuite struct { + tester.Suite +} + +func TestSerializationReaderUnitSuite(t *testing.T) { + suite.Run(t, &SerializationReaderUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *SerializationReaderUnitSuite) TestBackupSerializationReader() { + baseData := []byte("hello world") + + table := []struct { + name string + format readers.SerializationFormat + inputReaders []io.ReadCloser + + expectErr require.ErrorAssertionFunc + expectData []byte + }{ + { + name: "DeletedInFlight NoVersion NoReaders", + format: readers.SerializationFormat{ + DelInFlight: true, + }, + expectErr: require.NoError, + expectData: []byte{0x80, 0x0, 0x0, 0x0}, + }, + { + name: "DeletedInFlight NoReaders", + format: readers.SerializationFormat{ + Version: 42, + DelInFlight: true, + }, + expectErr: require.NoError, + expectData: []byte{0x80, 0x0, 0x0, 42}, + }, + { + name: "NoVersion NoReaders", + expectErr: require.NoError, + expectData: []byte{0x00, 0x0, 0x0, 0x0}, + }, + { + name: "NoReaders", + format: readers.SerializationFormat{ + Version: 42, + }, + expectErr: require.NoError, + expectData: []byte{0x00, 0x0, 0x0, 42}, + }, + { + name: "SingleReader", + format: readers.SerializationFormat{ + Version: 42, + }, + inputReaders: []io.ReadCloser{io.NopCloser(bytes.NewReader(baseData))}, + expectErr: require.NoError, + expectData: append([]byte{0x00, 0x0, 0x0, 42}, baseData...), + }, + { + name: "MultipleReaders", + format: readers.SerializationFormat{ + Version: 42, + }, + inputReaders: []io.ReadCloser{ + io.NopCloser(bytes.NewReader(baseData)), + io.NopCloser(bytes.NewReader(baseData)), + }, + expectErr: require.NoError, + expectData: append( + append([]byte{0x00, 0x0, 0x0, 42}, baseData...), + baseData...), + }, + // Uncomment if we expand the version to 32 bits. + //{ + // name: "VersionWithHighBitSet NoReaders Errors", + // format: readers.SerializationFormat{ + // Version: 0x80000000, + // }, + // expectErr: require.Error, + //}, + { + name: "DeletedInFlight SingleReader Errors", + format: readers.SerializationFormat{ + DelInFlight: true, + }, + inputReaders: []io.ReadCloser{io.NopCloser(bytes.NewReader(baseData))}, + expectErr: require.Error, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + r, err := readers.NewVersionedBackupReader( + test.format, + test.inputReaders...) + test.expectErr(t, err, "getting backup reader: %v", clues.ToCore(err)) + + if err != nil { + return + } + + defer func() { + err := r.Close() + assert.NoError(t, err, "closing reader: %v", clues.ToCore(err)) + }() + + buf, err := io.ReadAll(r) + require.NoError( + t, + err, + "reading serialized data: %v", + clues.ToCore(err)) + + // Need to use equal because output is order-sensitive. + assert.Equal(t, test.expectData, buf, "serialized data") + }) + } +} + +func (suite *SerializationReaderUnitSuite) TestBackupSerializationReader_ShortReads() { + t := suite.T() + + baseData := []byte("hello world") + expectData := append( + append([]byte{0x00, 0x0, 0x0, 42}, baseData...), + baseData...) + + r, err := readers.NewVersionedBackupReader( + readers.SerializationFormat{Version: 42}, + io.NopCloser(bytes.NewReader(baseData)), + io.NopCloser(bytes.NewReader(baseData))) + require.NoError(t, err, "getting backup reader: %v", clues.ToCore(err)) + + defer func() { + err := r.Close() + assert.NoError(t, err, "closing reader: %v", clues.ToCore(err)) + }() + + buf := make([]byte, len(expectData)) + r = &shortReader{ + maxReadLen: 3, + ReadCloser: r, + } + + for read := 0; ; { + n, err := r.Read(buf[read:]) + + read += n + if read >= len(buf) { + break + } + + require.NoError(t, err, "reading data: %v", clues.ToCore(err)) + } + + // Need to use equal because output is order-sensitive. + assert.Equal(t, expectData, buf, "serialized data") +} + +// TestRestoreSerializationReader checks that we can read previously serialized +// data. For simplicity, it uses the versionedBackupReader to generate the +// input. This should be relatively safe because the tests for +// versionedBackupReader do compare directly against serialized data. +func (suite *SerializationReaderUnitSuite) TestRestoreSerializationReader() { + baseData := []byte("hello world") + + table := []struct { + name string + inputReader func(*testing.T) io.ReadCloser + + expectErr require.ErrorAssertionFunc + expectVersion readers.SerializationVersion + expectDelInFlight bool + expectData []byte + }{ + { + name: "NoVersion NoReaders", + inputReader: func(t *testing.T) io.ReadCloser { + r, err := readers.NewVersionedBackupReader(readers.SerializationFormat{}) + require.NoError(t, err, "making reader: %v", clues.ToCore(err)) + + return r + }, + expectErr: require.NoError, + expectData: []byte{}, + }, + { + name: "DeletedInFlight NoReaders", + inputReader: func(t *testing.T) io.ReadCloser { + r, err := readers.NewVersionedBackupReader( + readers.SerializationFormat{ + Version: 42, + DelInFlight: true, + }) + require.NoError(t, err, "making reader: %v", clues.ToCore(err)) + + return r + }, + expectErr: require.NoError, + expectVersion: 42, + expectDelInFlight: true, + expectData: []byte{}, + }, + { + name: "DeletedInFlight SingleReader", + inputReader: func(t *testing.T) io.ReadCloser { + // Need to specify the bytes manually because the backup reader won't + // allow creating something with the deleted flag and data. + return io.NopCloser(bytes.NewReader(append( + []byte{0x80, 0x0, 0x0, 42}, + baseData...))) + }, + expectErr: require.NoError, + expectVersion: 42, + expectDelInFlight: true, + expectData: baseData, + }, + { + name: "NoVersion SingleReader", + inputReader: func(t *testing.T) io.ReadCloser { + r, err := readers.NewVersionedBackupReader( + readers.SerializationFormat{}, + io.NopCloser(bytes.NewReader(baseData))) + require.NoError(t, err, "making reader: %v", clues.ToCore(err)) + + return r + }, + expectErr: require.NoError, + expectData: baseData, + }, + { + name: "SingleReader", + inputReader: func(t *testing.T) io.ReadCloser { + r, err := readers.NewVersionedBackupReader( + readers.SerializationFormat{Version: 42}, + io.NopCloser(bytes.NewReader(baseData))) + require.NoError(t, err, "making reader: %v", clues.ToCore(err)) + + return r + }, + expectErr: require.NoError, + expectVersion: 42, + expectData: baseData, + }, + { + name: "ShortReads SingleReader", + inputReader: func(t *testing.T) io.ReadCloser { + r, err := readers.NewVersionedBackupReader( + readers.SerializationFormat{Version: 42}, + io.NopCloser(bytes.NewReader(baseData))) + require.NoError(t, err, "making reader: %v", clues.ToCore(err)) + + r = &shortReader{ + maxReadLen: 3, + ReadCloser: r, + } + + return r + }, + expectErr: require.NoError, + expectVersion: 42, + expectData: baseData, + }, + { + name: "MultipleReaders", + inputReader: func(t *testing.T) io.ReadCloser { + r, err := readers.NewVersionedBackupReader( + readers.SerializationFormat{Version: 42}, + io.NopCloser(bytes.NewReader(baseData)), + io.NopCloser(bytes.NewReader(baseData))) + require.NoError(t, err, "making reader: %v", clues.ToCore(err)) + + return r + }, + expectErr: require.NoError, + expectVersion: 42, + expectData: append(slices.Clone(baseData), baseData...), + }, + { + name: "EmptyReader Errors", + inputReader: func(t *testing.T) io.ReadCloser { + return io.NopCloser(bytes.NewReader([]byte{})) + }, + expectErr: require.Error, + }, + { + name: "TruncatedVersion Errors", + inputReader: func(t *testing.T) io.ReadCloser { + return io.NopCloser(bytes.NewReader([]byte{0x80, 0x0})) + }, + expectErr: require.Error, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + r, err := readers.NewVersionedRestoreReader(test.inputReader(t)) + test.expectErr(t, err, "getting restore reader: %v", clues.ToCore(err)) + + if err != nil { + return + } + + defer func() { + err := r.Close() + assert.NoError(t, err, "closing reader: %v", clues.ToCore(err)) + }() + + assert.Equal( + t, + test.expectVersion, + r.Format().Version, + "version") + assert.Equal( + t, + test.expectDelInFlight, + r.Format().DelInFlight, + "deleted in flight") + + buf, err := io.ReadAll(r) + require.NoError(t, err, "reading serialized data: %v", clues.ToCore(err)) + + // Need to use equal because output is order-sensitive. + assert.Equal(t, test.expectData, buf, "serialized data") + }) + } +}