diff --git a/src/internal/common/readers/serialization_version.go b/src/internal/common/readers/serialization_version.go index a6713f959..f203c3233 100644 --- a/src/internal/common/readers/serialization_version.go +++ b/src/internal/common/readers/serialization_version.go @@ -38,8 +38,8 @@ type SerializationVersion uint16 const DefaultSerializationVersion SerializationVersion = 1 const ( - versionFormatSize = int(unsafe.Sizeof(persistedSerializationVersion(0))) - delInFlightMask persistedSerializationVersion = 1 << ((versionFormatSize * 8) - 1) + VersionFormatSize = int(unsafe.Sizeof(persistedSerializationVersion(0))) + delInFlightMask persistedSerializationVersion = 1 << ((VersionFormatSize * 8) - 1) ) // SerializationFormat is a struct describing serialization format versions and @@ -68,7 +68,7 @@ func NewVersionedBackupReader( formattedVersion |= delInFlightMask } - formattedBuf := make([]byte, versionFormatSize) + formattedBuf := make([]byte, VersionFormatSize) binary.BigEndian.PutUint32(formattedBuf, formattedVersion) versionReader := io.NopCloser(bytes.NewReader(formattedBuf)) @@ -139,10 +139,10 @@ func (vbr *versionedBackupReader) Close() error { func NewVersionedRestoreReader( baseReader io.ReadCloser, ) (*VersionedRestoreReader, error) { - versionBuf := make([]byte, versionFormatSize) + versionBuf := make([]byte, VersionFormatSize) // Loop to account for the unlikely case where we get a short read. - for read := 0; read < versionFormatSize; { + for read := 0; read < VersionFormatSize; { n, err := baseReader.Read(versionBuf[read:]) if err != nil { return nil, clues.Wrap(err, "reading serialization version") diff --git a/src/internal/data/item.go b/src/internal/data/item.go index 6d316ad6b..c6cb064e7 100644 --- a/src/internal/data/item.go +++ b/src/internal/data/item.go @@ -1,7 +1,6 @@ package data import ( - "bytes" "context" "io" "sync" @@ -10,6 +9,7 @@ import ( "github.com/alcionai/clues" "github.com/spatialcurrent/go-lazy/pkg/lazy" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" @@ -46,12 +46,19 @@ func NewUnindexedPrefetchedItem( reader io.ReadCloser, itemID string, modTime time.Time, -) Item { +) (*unindexedPrefetchedItem, error) { + r, err := readers.NewVersionedBackupReader( + readers.SerializationFormat{Version: readers.DefaultSerializationVersion}, + reader) + if err != nil { + return nil, clues.Stack(err) + } + return &unindexedPrefetchedItem{ id: itemID, - reader: reader, + reader: r, modTime: modTime, - } + }, nil } // unindexedPrefetchedItem represents a single item retrieved from the remote @@ -92,15 +99,16 @@ func NewPrefetchedItem( reader io.ReadCloser, itemID string, info details.ItemInfo, -) Item { - return &prefetchedItem{ - unindexedPrefetchedItem: unindexedPrefetchedItem{ - id: itemID, - reader: reader, - modTime: info.Modified(), - }, - info: info, +) (*prefetchedItem, error) { + inner, err := NewUnindexedPrefetchedItem(reader, itemID, info.Modified()) + if err != nil { + return nil, clues.Stack(err) } + + return &prefetchedItem{ + unindexedPrefetchedItem: inner, + info: info, + }, nil } // prefetchedItem represents a single item retrieved from the remote service. @@ -108,7 +116,7 @@ func NewPrefetchedItem( // This item implements ItemInfo so it should be used for things that need to // appear in backup details. type prefetchedItem struct { - unindexedPrefetchedItem + *unindexedPrefetchedItem info details.ItemInfo } @@ -129,7 +137,7 @@ func NewUnindexedLazyItem( itemID string, modTime time.Time, errs *fault.Bus, -) Item { +) *unindexedLazyItem { return &unindexedLazyItem{ ctx: ctx, id: itemID, @@ -182,6 +190,10 @@ func (i *unindexedLazyItem) ToReader() io.ReadCloser { return nil, clues.Stack(err) } + format := readers.SerializationFormat{ + Version: readers.DefaultSerializationVersion, + } + // If an item was deleted then return an empty file so we don't fail the // backup and return a sentinel error when asked for ItemInfo so we don't // display the item in the backup. @@ -193,13 +205,17 @@ func (i *unindexedLazyItem) ToReader() io.ReadCloser { logger.Ctx(i.ctx).Info("item not found") i.delInFlight = true + format.DelInFlight = true + r, err := readers.NewVersionedBackupReader(format) - return io.NopCloser(bytes.NewReader([]byte{})), nil + return r, clues.Stack(err).OrNil() } i.info = info - return reader, nil + r, err := readers.NewVersionedBackupReader(format, reader) + + return r, clues.Stack(err).OrNil() }) } @@ -217,15 +233,14 @@ func NewLazyItem( itemID string, modTime time.Time, errs *fault.Bus, -) Item { +) *lazyItem { return &lazyItem{ - unindexedLazyItem: unindexedLazyItem{ - ctx: ctx, - id: itemID, - itemGetter: itemGetter, - modTime: modTime, - errs: errs, - }, + unindexedLazyItem: NewUnindexedLazyItem( + ctx, + itemGetter, + itemID, + modTime, + errs), } } @@ -236,7 +251,7 @@ func NewLazyItem( // This item implements ItemInfo so it should be used for things that need to // appear in backup details. type lazyItem struct { - unindexedLazyItem + *unindexedLazyItem } func (i *lazyItem) Info() (details.ItemInfo, error) { diff --git a/src/internal/data/item_test.go b/src/internal/data/item_test.go index 9484613e4..f0c7e9009 100644 --- a/src/internal/data/item_test.go +++ b/src/internal/data/item_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" @@ -50,11 +51,15 @@ func TestItemUnitSuite(t *testing.T) { } func (suite *ItemUnitSuite) TestUnindexedPrefetchedItem() { - prefetch := data.NewUnindexedPrefetchedItem( + prefetch, err := data.NewUnindexedPrefetchedItem( io.NopCloser(bytes.NewReader([]byte{})), "foo", time.Time{}) - _, ok := prefetch.(data.ItemInfo) + require.NoError(suite.T(), err, clues.ToCore(err)) + + var item data.Item = prefetch + + _, ok := item.(data.ItemInfo) assert.False(suite.T(), ok, "unindexedPrefetchedItem implements Info()") } @@ -70,7 +75,10 @@ func (suite *ItemUnitSuite) TestUnindexedLazyItem() { "foo", time.Time{}, fault.New(true)) - _, ok := lazy.(data.ItemInfo) + + var item data.Item = lazy + + _, ok := item.(data.ItemInfo) assert.False(t, ok, "unindexedLazyItem implements Info()") } @@ -140,18 +148,29 @@ func (suite *ItemUnitSuite) TestPrefetchedItem() { suite.Run(test.name, func() { t := suite.T() - item := data.NewPrefetchedItem(test.reader, id, test.info) + item, err := data.NewPrefetchedItem(test.reader, id, test.info) + require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, id, item.ID(), "ID") assert.False(t, item.Deleted(), "deleted") assert.Equal( t, test.info.Modified(), - item.(data.ItemModTime).ModTime(), + item.ModTime(), "mod time") - readData, err := io.ReadAll(item.ToReader()) - test.readErr(t, err, clues.ToCore(err), "read error") + r, err := readers.NewVersionedRestoreReader(item.ToReader()) + require.NoError(t, err, "version error: %v", clues.ToCore(err)) + + if err != nil { + return + } + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + test.readErr(t, err, "read error: %v", clues.ToCore(err)) assert.Equal(t, test.expectData, readData, "read data") }) } @@ -194,6 +213,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { table := []struct { name string mid *mockItemDataGetter + versionErr assert.ErrorAssertionFunc readErr assert.ErrorAssertionFunc infoErr assert.ErrorAssertionFunc expectData []byte @@ -205,6 +225,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { reader: io.NopCloser(bytes.NewReader([]byte{})), info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, }, + versionErr: assert.NoError, readErr: assert.NoError, infoErr: assert.NoError, expectData: []byte{}, @@ -215,6 +236,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { reader: io.NopCloser(bytes.NewReader(baseData)), info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, }, + versionErr: assert.NoError, readErr: assert.NoError, infoErr: assert.NoError, expectData: baseData, @@ -225,6 +247,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { reader: io.NopCloser(bytes.NewReader(baseData)), info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, }, + versionErr: assert.NoError, readErr: assert.NoError, infoErr: assert.NoError, expectData: baseData, @@ -234,6 +257,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { mid: &mockItemDataGetter{ err: assert.AnError, }, + versionErr: assert.Error, readErr: assert.Error, infoErr: assert.Error, expectData: []byte{}, @@ -249,6 +273,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { }, info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, }, + versionErr: assert.NoError, readErr: assert.Error, infoErr: assert.NoError, expectData: baseData[:5], @@ -278,15 +303,25 @@ func (suite *ItemUnitSuite) TestLazyItem() { assert.Equal( t, now, - item.(data.ItemModTime).ModTime(), + item.ModTime(), "mod time") // Read data to execute lazy reader. - readData, err := io.ReadAll(item.ToReader()) + r, err := readers.NewVersionedRestoreReader(item.ToReader()) + test.versionErr(t, err, "version error: %v", clues.ToCore(err)) + + if err != nil { + return + } + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) test.readErr(t, err, clues.ToCore(err), "read error") assert.Equal(t, test.expectData, readData, "read data") - _, err = item.(data.ItemInfo).Info() + _, err = item.Info() test.infoErr(t, err, "Info(): %v", clues.ToCore(err)) e := errs.Errors() @@ -326,15 +361,21 @@ func (suite *ItemUnitSuite) TestLazyItem_DeletedInFlight() { assert.Equal( t, now, - item.(data.ItemModTime).ModTime(), + item.ModTime(), "mod time") // Read data to execute lazy reader. - readData, err := io.ReadAll(item.ToReader()) + r, err := readers.NewVersionedRestoreReader(item.ToReader()) + require.NoError(t, err, "version error: %v", clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.True(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) require.NoError(t, err, clues.ToCore(err), "read error") assert.Empty(t, readData, "read data") - _, err = item.(data.ItemInfo).Info() + _, err = item.Info() assert.ErrorIs(t, err, data.ErrNotFound, "Info() error") e := errs.Errors() @@ -366,9 +407,9 @@ func (suite *ItemUnitSuite) TestLazyItem_InfoBeforeReadErrors() { assert.Equal( t, now, - item.(data.ItemModTime).ModTime(), + item.ModTime(), "mod time") - _, err := item.(data.ItemInfo).Info() + _, err := item.Info() assert.Error(t, err, "Info() error") } diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index 319914f1a..c5899afdf 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/clues" "github.com/kopia/kopia/fs" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" @@ -16,6 +17,7 @@ import ( var ( _ data.RestoreCollection = &kopiaDataCollection{} _ data.Item = &kopiaDataStream{} + _ data.ItemSize = &kopiaDataStream{} ) type kopiaDataCollection struct { @@ -23,7 +25,7 @@ type kopiaDataCollection struct { dir fs.Directory items []string counter ByteCounter - expectedVersion uint32 + expectedVersion readers.SerializationVersion } func (kdc *kopiaDataCollection) Items( @@ -102,7 +104,7 @@ func (kdc kopiaDataCollection) FetchItemByName( return nil, clues.New("object is not a file").WithClues(ctx) } - size := f.Size() - int64(versionSize) + size := f.Size() - int64(readers.VersionFormatSize) if size < 0 { logger.Ctx(ctx).Infow("negative file size; resetting to 0", "file_size", size) @@ -118,13 +120,32 @@ func (kdc kopiaDataCollection) FetchItemByName( return nil, clues.Wrap(err, "opening file").WithClues(ctx) } + // TODO(ashmrtn): Remove this when individual services implement checks for + // version and deleted items. + rr, err := readers.NewVersionedRestoreReader(r) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + + if rr.Format().Version != kdc.expectedVersion { + return nil, clues.New("unexpected data format"). + WithClues(ctx). + With( + "read_version", rr.Format().Version, + "expected_version", kdc.expectedVersion) + } + + // This is a conservative check, but we shouldn't be seeing items that were + // deleted in flight during restores because there's no way to select them. + if rr.Format().DelInFlight { + return nil, clues.New("selected item marked as deleted in flight"). + WithClues(ctx) + } + return &kopiaDataStream{ - id: name, - reader: &restoreStreamReader{ - ReadCloser: r, - expectedVersion: kdc.expectedVersion, - }, - size: size, + id: name, + reader: rr, + size: size, }, nil } diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go index d587730ca..4b1b4a4b2 100644 --- a/src/internal/kopia/data_collection_test.go +++ b/src/internal/kopia/data_collection_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" dataMock "github.com/alcionai/corso/src/internal/data/mock" "github.com/alcionai/corso/src/internal/tester" @@ -121,25 +122,35 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { ) // Needs to be a function so the readers get refreshed each time. - getLayout := func() fs.Directory { + getLayout := func(t *testing.T) fs.Directory { + format := readers.SerializationFormat{ + Version: readers.DefaultSerializationVersion, + } + + r1, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(files[0].data))) + require.NoError(t, err, clues.ToCore(err)) + + r2, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(files[1].data))) + require.NoError(t, err, clues.ToCore(err)) + return virtualfs.NewStaticDirectory(encodeAsPath("foo"), []fs.Entry{ &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(files[0].uuid), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(files[0].data))), - size: int64(len(files[0].data) + versionSize), + r: r1, + size: int64(len(files[0].data) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(files[1].uuid), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(files[1].data))), - size: int64(len(files[1].data) + versionSize), + r: r2, + size: int64(len(files[1].data) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( @@ -224,10 +235,10 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { } c := kopiaDataCollection{ - dir: getLayout(), + dir: getLayout(t), path: nil, items: items, - expectedVersion: serializationVersion, + expectedVersion: readers.DefaultSerializationVersion, } var ( @@ -291,23 +302,34 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { // Needs to be a function so we can switch the serialization version as // needed. - getLayout := func(serVersion uint32) fs.Directory { + getLayout := func( + t *testing.T, + serVersion readers.SerializationVersion, + ) fs.Directory { + format := readers.SerializationFormat{Version: serVersion} + + r1, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader([]byte(noErrFileData)))) + require.NoError(t, err, clues.ToCore(err)) + + r2, err := readers.NewVersionedBackupReader( + format, + errReader.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + return virtualfs.NewStaticDirectory(encodeAsPath(folder2), []fs.Entry{ &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(noErrFileName), nil), - r: newBackupStreamReader( - serVersion, - io.NopCloser(bytes.NewReader([]byte(noErrFileData)))), + r: r1, }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(errFileName), nil), - r: newBackupStreamReader( - serVersion, - errReader.ToReader()), + r: r2, }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( @@ -330,7 +352,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { table := []struct { name string inputName string - inputSerializationVersion uint32 + inputSerializationVersion readers.SerializationVersion expectedData []byte lookupErr assert.ErrorAssertionFunc readErr assert.ErrorAssertionFunc @@ -339,7 +361,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { { name: "FileFound_NoError", inputName: noErrFileName, - inputSerializationVersion: serializationVersion, + inputSerializationVersion: readers.DefaultSerializationVersion, expectedData: []byte(noErrFileData), lookupErr: assert.NoError, readErr: assert.NoError, @@ -347,21 +369,20 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { { name: "FileFound_ReadError", inputName: errFileName, - inputSerializationVersion: serializationVersion, + inputSerializationVersion: readers.DefaultSerializationVersion, lookupErr: assert.NoError, readErr: assert.Error, }, { name: "FileFound_VersionError", inputName: noErrFileName, - inputSerializationVersion: serializationVersion + 1, - lookupErr: assert.NoError, - readErr: assert.Error, + inputSerializationVersion: readers.DefaultSerializationVersion + 1, + lookupErr: assert.Error, }, { name: "FileNotFound", inputName: "foo", - inputSerializationVersion: serializationVersion + 1, + inputSerializationVersion: readers.DefaultSerializationVersion + 1, lookupErr: assert.Error, notFoundErr: true, }, @@ -373,14 +394,14 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { ctx, flush := tester.NewContext(t) defer flush() - root := getLayout(test.inputSerializationVersion) + root := getLayout(t, test.inputSerializationVersion) c := &i64counter{} col := &kopiaDataCollection{ path: pth, dir: root, counter: c, - expectedVersion: serializationVersion, + expectedVersion: readers.DefaultSerializationVersion, } s, err := col.FetchItemByName(ctx, test.inputName) diff --git a/src/internal/kopia/merge_collection_test.go b/src/internal/kopia/merge_collection_test.go index f89c2dd95..fefbfbb15 100644 --- a/src/internal/kopia/merge_collection_test.go +++ b/src/internal/kopia/merge_collection_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/service/exchange/mock" "github.com/alcionai/corso/src/internal/tester" @@ -150,20 +151,27 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() { require.NoError(suite.T(), err, clues.ToCore(err)) // Needs to be a function so the readers get refreshed each time. - layouts := []func() fs.Directory{ + layouts := []func(t *testing.T) fs.Directory{ // Has the following; // - file1: data[0] // - errOpen: (error opening file) - func() fs.Directory { + func(t *testing.T) fs.Directory { + format := readers.SerializationFormat{ + Version: readers.DefaultSerializationVersion, + } + + r1, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(fileData1))) + require.NoError(t, err, clues.ToCore(err)) + return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[0]), []fs.Entry{ &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(fileName1), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData1))), - size: int64(len(fileData1) + versionSize), + r: r1, + size: int64(len(fileData1) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( @@ -178,34 +186,47 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() { // - file1: data[1] // - file2: data[0] // - errOpen: data[2] - func() fs.Directory { + func(t *testing.T) fs.Directory { + format := readers.SerializationFormat{ + Version: readers.DefaultSerializationVersion, + } + + r1, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(fileData2))) + require.NoError(t, err, clues.ToCore(err)) + + r2, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(fileData1))) + require.NoError(t, err, clues.ToCore(err)) + + r3, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(fileData3))) + require.NoError(t, err, clues.ToCore(err)) + return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[1]), []fs.Entry{ &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(fileName1), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData2))), - size: int64(len(fileData2) + versionSize), + r: r1, + size: int64(len(fileData2) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(fileName2), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData1))), - size: int64(len(fileData1) + versionSize), + r: r2, + size: int64(len(fileData1) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(fileOpenErrName), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData3))), - size: int64(len(fileData3) + versionSize), + r: r3, + size: int64(len(fileData3) + readers.VersionFormatSize), }, }) }, @@ -257,9 +278,9 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() { for i, layout := range layouts { col := &kopiaDataCollection{ path: pth, - dir: layout(), + dir: layout(t), counter: c, - expectedVersion: serializationVersion, + expectedVersion: readers.DefaultSerializationVersion, } err := dc.addCollection(colPaths[i], col) diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index bc7a1f034..c1e1351e5 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -13,7 +13,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "github.com/alcionai/clues" "github.com/kopia/kopia/fs" @@ -24,6 +23,7 @@ import ( "github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/m365/graph" @@ -37,7 +37,7 @@ import ( const maxInflateTraversalDepth = 500 -var versionSize = int(unsafe.Sizeof(serializationVersion)) +var versionSize = readers.VersionFormatSize func newBackupStreamReader(version uint32, reader io.ReadCloser) *backupStreamReader { buf := make([]byte, versionSize) @@ -436,7 +436,7 @@ func collectionEntries( entry := virtualfs.StreamingFileWithModTimeFromReader( encodedName, modTime, - newBackupStreamReader(serializationVersion, e.ToReader())) + e.ToReader()) err = ctr(ctx, entry) if err != nil { diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index fd74cd9fa..c88da8af0 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -124,12 +124,6 @@ func expectFileData( return } - // Need to wrap with a restore stream reader to remove the version. - r = &restoreStreamReader{ - ReadCloser: io.NopCloser(r), - expectedVersion: serializationVersion, - } - got, err := io.ReadAll(r) if !assert.NoError(t, err, "reading data in file", name, clues.ToCore(err)) { return @@ -2420,9 +2414,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsCorrectSubt encodeElements(inboxFileName1)[0], time.Time{}, // Wrap with a backup reader so it gets the version injected. - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(inboxFileData1v2)))), + io.NopCloser(bytes.NewReader(inboxFileData1v2))), }), }), virtualfs.NewStaticDirectory( @@ -2582,9 +2574,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsMigrateSubt virtualfs.StreamingFileWithModTimeFromReader( encodeElements(inboxFileName1)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(inboxFileData1)))), + io.NopCloser(bytes.NewReader(inboxFileData1))), }), }), virtualfs.NewStaticDirectory( @@ -2596,9 +2586,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsMigrateSubt virtualfs.StreamingFileWithModTimeFromReader( encodeElements(contactsFileName1)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(contactsFileData1)))), + io.NopCloser(bytes.NewReader(contactsFileData1))), }), }), }) @@ -2817,15 +2805,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName5)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData5)))), + io.NopCloser(bytes.NewReader(fileData5))), virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName6)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData6)))), + io.NopCloser(bytes.NewReader(fileData6))), }) counters[folderID3] = count @@ -2835,15 +2819,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName3)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData3)))), + io.NopCloser(bytes.NewReader(fileData3))), virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName4)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData4)))), + io.NopCloser(bytes.NewReader(fileData4))), folder, }) counters[folderID2] = count @@ -2859,15 +2839,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName1)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData1)))), + io.NopCloser(bytes.NewReader(fileData1))), virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName2)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData2)))), + io.NopCloser(bytes.NewReader(fileData2))), folder, folder4, }) @@ -2879,15 +2855,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName7)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData7)))), + io.NopCloser(bytes.NewReader(fileData7))), virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName8)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData8)))), + io.NopCloser(bytes.NewReader(fileData8))), }) counters[folderID5] = count diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 24e0708b5..10523de6c 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -18,6 +18,7 @@ import ( "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/prefixmatcher" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/observe" @@ -36,8 +37,6 @@ const ( // possibly corresponding to who is making the backup. corsoHost = "corso-host" corsoUser = "corso" - - serializationVersion uint32 = 1 ) // common manifest tags @@ -447,7 +446,7 @@ func loadDirsAndItems( dir: dir, items: dirItems.items, counter: bcounter, - expectedVersion: serializationVersion, + expectedVersion: readers.DefaultSerializationVersion, } if err := mergeCol.addCollection(dirItems.dir.String(), dc); err != nil { diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 452bc4ffa..77721fc7b 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -1268,7 +1268,10 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { ctx, []identity.Reasoner{r}, nil, - []data.BackupCollection{dc1, dc2}, + []data.BackupCollection{ + dataMock.NewVersionedBackupCollection(t, dc1), + dataMock.NewVersionedBackupCollection(t, dc2), + }, nil, nil, true, @@ -1577,12 +1580,15 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { }) } - collections = append(collections, collection) + collections = append( + collections, + dataMock.NewVersionedBackupCollection(t, collection)) } r := NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory) - stats, deets, _, err := suite.w.ConsumeBackupCollections( + // Other tests check basic things about deets so not doing that again here. + stats, _, _, err := suite.w.ConsumeBackupCollections( suite.ctx, []identity.Reasoner{r}, nil, @@ -1597,8 +1603,6 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { require.Equal(t, stats.TotalDirectoryCount, expectedDirs) require.Equal(t, stats.IgnoredErrorCount, 0) require.False(t, stats.Incomplete) - // 6 file and 2 folder entries. - assert.Len(t, deets.Details().Entries, expectedFiles+2) suite.snapshotID = manifest.ID(stats.SnapshotID) } @@ -1629,7 +1633,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excludePrefix bool expectedCachedItems int expectedUncachedItems int - cols func() []data.BackupCollection + cols func(t *testing.T) []data.BackupCollection backupIDCheck require.ValueAssertionFunc restoreCheck assert.ErrorAssertionFunc }{ @@ -1638,7 +1642,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excludeItem: true, expectedCachedItems: len(suite.filesByPath) - 1, expectedUncachedItems: 0, - cols: func() []data.BackupCollection { + cols: func(t *testing.T) []data.BackupCollection { return nil }, backupIDCheck: require.NotEmpty, @@ -1650,7 +1654,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excludePrefix: true, expectedCachedItems: len(suite.filesByPath) - 1, expectedUncachedItems: 0, - cols: func() []data.BackupCollection { + cols: func(t *testing.T) []data.BackupCollection { return nil }, backupIDCheck: require.NotEmpty, @@ -1661,7 +1665,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { // No snapshot should be made since there were no changes. expectedCachedItems: 0, expectedUncachedItems: 0, - cols: func() []data.BackupCollection { + cols: func(t *testing.T) []data.BackupCollection { return nil }, // Backup doesn't run. @@ -1671,7 +1675,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { name: "NoExcludeItemWithChanges", expectedCachedItems: len(suite.filesByPath), expectedUncachedItems: 1, - cols: func() []data.BackupCollection { + cols: func(t *testing.T) []data.BackupCollection { c := exchMock.NewCollection( suite.testPath1, suite.testPath1, @@ -1679,7 +1683,9 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { c.ColState = data.NotMovedState c.PrevPath = suite.testPath1 - return []data.BackupCollection{c} + return []data.BackupCollection{ + dataMock.NewVersionedBackupCollection(t, c), + } }, backupIDCheck: require.NotEmpty, restoreCheck: assert.NoError, @@ -1717,7 +1723,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { Manifest: man, Reasons: []identity.Reasoner{r}, }), - test.cols(), + test.cols(t), excluded, nil, true, diff --git a/src/internal/m365/collection/drive/collection.go b/src/internal/m365/collection/drive/collection.go index 19de8e0dc..423c43930 100644 --- a/src/internal/m365/collection/drive/collection.go +++ b/src/internal/m365/collection/drive/collection.go @@ -584,15 +584,24 @@ func (oc *Collection) streamDriveItem( return progReader, nil }) - // We wrap the reader with a lazy reader so that the progress bar is only - // initialized if the file is read. Since we're not actually lazily reading - // data just use the eager item implementation. - oc.data <- data.NewUnindexedPrefetchedItem( + storeItem, err := data.NewUnindexedPrefetchedItem( metaReader, metaFileName+metaSuffix, // Metadata file should always use the latest time as // permissions change does not update mod time. time.Now()) + if err != nil { + errs.AddRecoverable(ctx, clues.Stack(err). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation)) + + return + } + + // We wrap the reader with a lazy reader so that the progress bar is only + // initialized if the file is read. Since we're not actually lazily reading + // data just use the eager item implementation. + oc.data <- storeItem // Item read successfully, add to collection if isFile { diff --git a/src/internal/m365/collection/drive/collection_test.go b/src/internal/m365/collection/drive/collection_test.go index b99a2484e..2e2f85160 100644 --- a/src/internal/m365/collection/drive/collection_test.go +++ b/src/internal/m365/collection/drive/collection_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" metaTD "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata/testdata" @@ -256,7 +257,7 @@ func (suite *CollectionUnitSuite) TestCollection() { mt := readItem.(data.ItemModTime) assert.Equal(t, now, mt.ModTime()) - readData, err := io.ReadAll(readItem.ToReader()) + rr, err := readers.NewVersionedRestoreReader(readItem.ToReader()) test.expectErr(t, err) if err != nil { @@ -267,13 +268,25 @@ func (suite *CollectionUnitSuite) TestCollection() { return } + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + + readData, err := io.ReadAll(rr) + require.NoError(t, err, clues.ToCore(err)) + assert.Equal(t, stubItemContent, readData) readItemMeta := readItems[1] assert.Equal(t, stubItemID+metadata.MetaFileSuffix, readItemMeta.ID()) + rr, err = readers.NewVersionedRestoreReader(readItemMeta.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + readMeta := metadata.Metadata{} - err = json.NewDecoder(readItemMeta.ToReader()).Decode(&readMeta) + err = json.NewDecoder(rr).Decode(&readMeta) require.NoError(t, err, clues.ToCore(err)) metaTD.AssertMetadataEqual(t, stubMeta, readMeta) @@ -485,12 +498,18 @@ func (suite *CollectionUnitSuite) TestCollectionPermissionBackupLatestModTime() for _, i := range readItems { if strings.HasSuffix(i.ID(), metadata.MetaFileSuffix) { - content, err := io.ReadAll(i.ToReader()) + rr, err := readers.NewVersionedRestoreReader(i.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + + content, err := io.ReadAll(rr) require.NoError(t, err, clues.ToCore(err)) require.Equal(t, `{"filename":"Fake Item","permissionMode":1}`, string(content)) im, ok := i.(data.ItemModTime) - require.Equal(t, ok, true, "modtime interface") + require.True(t, ok, "modtime interface") require.Greater(t, im.ModTime(), mtime, "permissions time greater than mod time") } } diff --git a/src/internal/m365/collection/drive/collections_test.go b/src/internal/m365/collection/drive/collections_test.go index 1b50d074a..d0e33477f 100644 --- a/src/internal/m365/collection/drive/collections_test.go +++ b/src/internal/m365/collection/drive/collections_test.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/src/internal/common/prefixmatcher" pmMock "github.com/alcionai/corso/src/internal/common/prefixmatcher/mock" "github.com/alcionai/corso/src/internal/data" + dataMock "github.com/alcionai/corso/src/internal/data/mock" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" "github.com/alcionai/corso/src/internal/m365/graph" odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" @@ -1114,7 +1115,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestDeserializeMetadata() { func(*support.ControllerOperationStatus) {}) require.NoError(t, err, clues.ToCore(err)) - cols = append(cols, data.NoFetchRestoreCollection{Collection: mc}) + cols = append(cols, dataMock.NewUnversionedRestoreCollection( + t, + data.NoFetchRestoreCollection{Collection: mc})) } deltas, paths, canUsePreviousBackup, err := deserializeMetadata(ctx, cols) @@ -2211,7 +2214,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { func(*support.ControllerOperationStatus) {}) assert.NoError(t, err, "creating metadata collection", clues.ToCore(err)) - prevMetadata := []data.RestoreCollection{data.NoFetchRestoreCollection{Collection: mc}} + prevMetadata := []data.RestoreCollection{ + dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: mc}), + } errs := fault.New(true) delList := prefixmatcher.NewStringSetBuilder() @@ -2238,7 +2243,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { deltas, paths, _, err := deserializeMetadata( ctx, []data.RestoreCollection{ - data.NoFetchRestoreCollection{Collection: baseCol}, + dataMock.NewUnversionedRestoreCollection( + t, + data.NoFetchRestoreCollection{Collection: baseCol}), }) if !assert.NoError(t, err, "deserializing metadata", clues.ToCore(err)) { continue diff --git a/src/internal/m365/collection/exchange/backup_test.go b/src/internal/m365/collection/exchange/backup_test.go index 4b046fd47..bb6aad27c 100644 --- a/src/internal/m365/collection/exchange/backup_test.go +++ b/src/internal/m365/collection/exchange/backup_test.go @@ -15,7 +15,9 @@ import ( inMock "github.com/alcionai/corso/src/internal/common/idname/mock" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" + dataMock "github.com/alcionai/corso/src/internal/data/mock" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/operations/inject" @@ -322,7 +324,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { require.NoError(t, err, clues.ToCore(err)) cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ - data.NoFetchRestoreCollection{Collection: coll}, + dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: coll}), }) test.expectError(t, err, clues.ToCore(err)) @@ -591,7 +593,7 @@ func (suite *BackupIntgSuite) TestDelta() { require.NotNil(t, metadata, "collections contains a metadata collection") cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ - data.NoFetchRestoreCollection{Collection: metadata}, + dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: metadata}), }) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") @@ -666,7 +668,12 @@ func (suite *BackupIntgSuite) TestMailSerializationRegression() { for stream := range streamChannel { buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) + rr, err := readers.NewVersionedRestoreReader(stream.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + + read, err := buf.ReadFrom(rr) assert.NoError(t, err, clues.ToCore(err)) assert.NotZero(t, read) @@ -744,7 +751,13 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() { for stream := range edc.Items(ctx, fault.New(true)) { buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) + + rr, err := readers.NewVersionedRestoreReader(stream.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + + read, err := buf.ReadFrom(rr) assert.NoError(t, err, clues.ToCore(err)) assert.NotZero(t, read) @@ -878,7 +891,12 @@ func (suite *BackupIntgSuite) TestEventsSerializationRegression() { for item := range edc.Items(ctx, fault.New(true)) { buf := &bytes.Buffer{} - read, err := buf.ReadFrom(item.ToReader()) + rr, err := readers.NewVersionedRestoreReader(item.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + + read, err := buf.ReadFrom(rr) assert.NoError(t, err, clues.ToCore(err)) assert.NotZero(t, read) @@ -1198,7 +1216,9 @@ func checkMetadata( ) { catPaths, _, err := ParseMetadataCollections( ctx, - []data.RestoreCollection{data.NoFetchRestoreCollection{Collection: c}}) + []data.RestoreCollection{ + dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: c}), + }) if !assert.NoError(t, err, "getting metadata", clues.ToCore(err)) { return } diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index 30f28672d..71b9bb01b 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -278,10 +278,21 @@ func (col *prefetchCollection) streamItems( return } - stream <- data.NewPrefetchedItem( + item, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(itemData)), id, details.ItemInfo{Exchange: info}) + if err != nil { + el.AddRecoverable( + ctx, + clues.Stack(err). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation)) + + return + } + + stream <- item atomic.AddInt64(&success, 1) atomic.AddInt64(&totalBytes, info.Size) diff --git a/src/internal/m365/collection/exchange/collection_test.go b/src/internal/m365/collection/exchange/collection_test.go index 5e1665faa..f373bd1a5 100644 --- a/src/internal/m365/collection/exchange/collection_test.go +++ b/src/internal/m365/collection/exchange/collection_test.go @@ -17,6 +17,7 @@ import ( "golang.org/x/exp/slices" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/exchange/mock" "github.com/alcionai/corso/src/internal/m365/graph" @@ -55,13 +56,20 @@ func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() { suite.Run(test.name, func() { t := suite.T() - ed := data.NewPrefetchedItem( + ed, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(test.readData)), "itemID", details.ItemInfo{}) + require.NoError(t, err, clues.ToCore(err)) + + r, err := readers.NewVersionedRestoreReader(ed.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) buf := &bytes.Buffer{} - _, err := buf.ReadFrom(ed.ToReader()) + _, err = buf.ReadFrom(r) assert.NoError(t, err, "reading data: %v", clues.ToCore(err)) assert.Equal(t, test.readData, buf.Bytes(), "read data") assert.Equal(t, "itemID", ed.ID(), "item ID") @@ -493,11 +501,11 @@ func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() { time.Now(), fault.New(true)) - _, err := li.(data.ItemInfo).Info() + _, err := li.Info() assert.Error(suite.T(), err, "Info without reading data should error") } -func (suite *CollectionUnitSuite) TestLazyItem() { +func (suite *CollectionUnitSuite) TestLazyItem_GetDataErrors() { var ( parentPath = "inbox/private/silly cats" now = time.Now() @@ -505,44 +513,19 @@ func (suite *CollectionUnitSuite) TestLazyItem() { table := []struct { name string - modTime time.Time getErr error serializeErr error - expectModTime time.Time expectReadErrType error - dataCheck assert.ValueAssertionFunc - expectInfoErr bool - expectInfoErrType error }{ - { - name: "ReturnsEmptyReaderOnDeletedInFlight", - modTime: now, - getErr: graph.ErrDeletedInFlight, - dataCheck: assert.Empty, - expectInfoErr: true, - expectInfoErrType: data.ErrNotFound, - }, - { - name: "ReturnsValidReaderAndInfo", - modTime: now, - dataCheck: assert.NotEmpty, - expectModTime: now, - }, { name: "ReturnsErrorOnGenericGetError", - modTime: now, getErr: assert.AnError, expectReadErrType: assert.AnError, - dataCheck: assert.Empty, - expectInfoErr: true, }, { name: "ReturnsErrorOnGenericSerializeError", - modTime: now, serializeErr: assert.AnError, expectReadErrType: assert.AnError, - dataCheck: assert.Empty, - expectInfoErr: true, }, } @@ -575,47 +558,128 @@ func (suite *CollectionUnitSuite) TestLazyItem() { userID: "userID", itemID: "itemID", getter: getter, - modTime: test.modTime, + modTime: now, immutableIDs: false, parentPath: parentPath, }, "itemID", - test.modTime, + now, fault.New(true)) assert.False(t, li.Deleted(), "item shouldn't be marked deleted") - assert.Equal( - t, - test.modTime, - li.(data.ItemModTime).ModTime(), - "item mod time") + assert.Equal(t, now, li.ModTime(), "item mod time") - readData, err := io.ReadAll(li.ToReader()) - if test.expectReadErrType == nil { - assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) - } else { - assert.ErrorIs(t, err, test.expectReadErrType, "read error") - } - - test.dataCheck(t, readData, "read item data") - - info, err := li.(data.ItemInfo).Info() - - // Didn't expect an error getting info, it should be valid. - if !test.expectInfoErr { - assert.NoError(t, err, "getting item info: %v", clues.ToCore(err)) - assert.Equal(t, parentPath, info.Exchange.ParentPath) - assert.Equal(t, test.expectModTime, info.Modified()) - - return - } + _, err := readers.NewVersionedRestoreReader(li.ToReader()) + assert.ErrorIs(t, err, test.expectReadErrType) // Should get some form of error when trying to get info. + _, err = li.Info() assert.Error(t, err, "Info()") - - if test.expectInfoErrType != nil { - assert.ErrorIs(t, err, test.expectInfoErrType, "Info() error") - } }) } } + +func (suite *CollectionUnitSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlight() { + var ( + t = suite.T() + + parentPath = "inbox/private/silly cats" + now = time.Now() + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + getter := &mock.ItemGetSerialize{GetErr: graph.ErrDeletedInFlight} + + li := data.NewLazyItem( + ctx, + &lazyItemGetter{ + userID: "userID", + itemID: "itemID", + getter: getter, + modTime: now, + immutableIDs: false, + parentPath: parentPath, + }, + "itemID", + now, + fault.New(true)) + + assert.False(t, li.Deleted(), "item shouldn't be marked deleted") + assert.Equal( + t, + now, + li.ModTime(), + "item mod time") + + r, err := readers.NewVersionedRestoreReader(li.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.True(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) + + assert.Empty(t, readData, "read item data") + + _, err = li.Info() + assert.ErrorIs(t, err, data.ErrNotFound, "Info() error") +} + +func (suite *CollectionUnitSuite) TestLazyItem() { + var ( + t = suite.T() + + parentPath = "inbox/private/silly cats" + now = time.Now() + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + // Exact data type doesn't really matter. + testData := models.NewMessage() + testData.SetSubject(ptr.To("hello world")) + + getter := &mock.ItemGetSerialize{GetData: testData} + + li := data.NewLazyItem( + ctx, + &lazyItemGetter{ + userID: "userID", + itemID: "itemID", + getter: getter, + modTime: now, + immutableIDs: false, + parentPath: parentPath, + }, + "itemID", + now, + fault.New(true)) + + assert.False(t, li.Deleted(), "item shouldn't be marked deleted") + assert.Equal( + t, + now, + li.ModTime(), + "item mod time") + + r, err := readers.NewVersionedRestoreReader(li.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) + + assert.NotEmpty(t, readData, "read item data") + + info, err := li.Info() + assert.NoError(t, err, "getting item info: %v", clues.ToCore(err)) + + assert.Equal(t, parentPath, info.Exchange.ParentPath) + assert.Equal(t, now, info.Modified()) +} diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index b8ff3b436..0a1ca7212 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -150,27 +150,47 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { parentFolderID, id) if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "writing channel message to serializer")) + el.AddRecoverable( + ctx, + clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)) + return } if err := writer.WriteObjectValue("", item); err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "writing channel message to serializer")) + el.AddRecoverable( + ctx, + clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)) + return } itemData, err := writer.GetSerializedContent() if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "serializing channel message")) + el.AddRecoverable( + ctx, + clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)) + return } info.ParentPath = col.LocationPath().String() - col.stream <- data.NewPrefetchedItem( + storeItem, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(itemData)), id, details.ItemInfo{Groups: info}) + if err != nil { + el.AddRecoverable( + ctx, + clues.Stack(err). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation)) + + return + } + + col.stream <- storeItem atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&totalBytes, info.Size) diff --git a/src/internal/m365/collection/groups/collection_test.go b/src/internal/m365/collection/groups/collection_test.go index be4c52dc7..e0bf19d19 100644 --- a/src/internal/m365/collection/groups/collection_test.go +++ b/src/internal/m365/collection/groups/collection_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/groups/mock" "github.com/alcionai/corso/src/internal/m365/support" @@ -48,13 +49,20 @@ func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() { suite.Run(test.name, func() { t := suite.T() - ed := data.NewPrefetchedItem( + ed, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(test.readData)), "itemID", details.ItemInfo{}) + require.NoError(t, err, clues.ToCore(err)) + + r, err := readers.NewVersionedRestoreReader(ed.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) buf := &bytes.Buffer{} - _, err := buf.ReadFrom(ed.ToReader()) + _, err = buf.ReadFrom(r) assert.NoError(t, err, "reading data: %v", clues.ToCore(err)) assert.Equal(t, test.readData, buf.Bytes(), "read data") assert.Equal(t, "itemID", ed.ID(), "item ID") diff --git a/src/internal/m365/collection/site/collection.go b/src/internal/m365/collection/site/collection.go index 422ed4b2a..8af643d4b 100644 --- a/src/internal/m365/collection/site/collection.go +++ b/src/internal/m365/collection/site/collection.go @@ -211,11 +211,17 @@ func (sc *Collection) retrieveLists( metrics.Bytes += size metrics.Successes++ - sc.data <- data.NewPrefetchedItem( + + item, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), ptr.Val(lst.GetId()), details.ItemInfo{SharePoint: ListToSPInfo(lst, size)}) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + continue + } + sc.data <- item progress <- struct{}{} } } @@ -272,11 +278,17 @@ func (sc *Collection) retrievePages( if size > 0 { metrics.Bytes += size metrics.Successes++ - sc.data <- data.NewPrefetchedItem( + + item, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), ptr.Val(pg.GetId()), details.ItemInfo{SharePoint: pageToSPInfo(pg, root, size)}) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + continue + } + sc.data <- item progress <- struct{}{} } } diff --git a/src/internal/m365/collection/site/collection_test.go b/src/internal/m365/collection/site/collection_test.go index 3d0336217..5b53513f0 100644 --- a/src/internal/m365/collection/site/collection_test.go +++ b/src/internal/m365/collection/site/collection_test.go @@ -103,10 +103,11 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { byteArray, err := ow.GetSerializedContent() require.NoError(t, err, clues.ToCore(err)) - data := data.NewPrefetchedItem( + data, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), name, details.ItemInfo{SharePoint: ListToSPInfo(listing, int64(len(byteArray)))}) + require.NoError(t, err, clues.ToCore(err)) return data }, @@ -132,10 +133,11 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { page, err := betaAPI.CreatePageFromBytes(byteArray) require.NoError(t, err, clues.ToCore(err)) - data := data.NewPrefetchedItem( + data, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), itemName, details.ItemInfo{SharePoint: betaAPI.PageInfo(page, int64(len(byteArray)))}) + require.NoError(t, err, clues.ToCore(err)) return data }, @@ -194,10 +196,11 @@ func (suite *SharePointCollectionSuite) TestListCollection_Restore() { byteArray, err := service.Serialize(listing) require.NoError(t, err, clues.ToCore(err)) - listData := data.NewPrefetchedItem( + listData, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), testName, details.ItemInfo{SharePoint: ListToSPInfo(listing, int64(len(byteArray)))}) + require.NoError(t, err, clues.ToCore(err)) destName := testdata.DefaultRestoreConfig("").Location diff --git a/src/internal/m365/graph/metadata_collection.go b/src/internal/m365/graph/metadata_collection.go index 7e06faaba..1c3d1f766 100644 --- a/src/internal/m365/graph/metadata_collection.go +++ b/src/internal/m365/graph/metadata_collection.go @@ -57,11 +57,16 @@ func (mce MetadataCollectionEntry) toMetadataItem() (metadataItem, error) { return metadataItem{}, clues.Wrap(err, "serializing metadata") } + item, err := data.NewUnindexedPrefetchedItem( + io.NopCloser(buf), + mce.fileName, + time.Now()) + if err != nil { + return metadataItem{}, clues.Stack(err) + } + return metadataItem{ - Item: data.NewUnindexedPrefetchedItem( - io.NopCloser(buf), - mce.fileName, - time.Now()), + Item: item, size: int64(buf.Len()), }, nil } diff --git a/src/internal/m365/graph/metadata_collection_test.go b/src/internal/m365/graph/metadata_collection_test.go index 0423cdf40..ee9ca6b5c 100644 --- a/src/internal/m365/graph/metadata_collection_test.go +++ b/src/internal/m365/graph/metadata_collection_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/tester" @@ -69,13 +70,16 @@ func (suite *MetadataCollectionUnitSuite) TestItems() { items := []metadataItem{} for i := 0; i < len(itemNames); i++ { + item, err := data.NewUnindexedPrefetchedItem( + io.NopCloser(bytes.NewReader(itemData[i])), + itemNames[i], + time.Time{}) + require.NoError(t, err, clues.ToCore(err)) + items = append( items, metadataItem{ - Item: data.NewUnindexedPrefetchedItem( - io.NopCloser(bytes.NewReader(itemData[i])), - itemNames[i], - time.Time{}), + Item: item, size: int64(len(itemData[i])), }) } @@ -103,7 +107,13 @@ func (suite *MetadataCollectionUnitSuite) TestItems() { for s := range c.Items(ctx, fault.New(true)) { gotNames = append(gotNames, s.ID()) - buf, err := io.ReadAll(s.ToReader()) + rr, err := readers.NewVersionedRestoreReader(s.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + + buf, err := io.ReadAll(rr) if !assert.NoError(t, err, clues.ToCore(err)) { continue } @@ -204,11 +214,17 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() { for item := range col.Items(ctx, fault.New(true)) { assert.Equal(t, test.metadata.fileName, item.ID()) + rr, err := readers.NewVersionedRestoreReader(item.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + gotMap := map[string]string{} - decoder := json.NewDecoder(item.ToReader()) + decoder := json.NewDecoder(rr) itemCount++ - err := decoder.Decode(&gotMap) + err = decoder.Decode(&gotMap) if !assert.NoError(t, err, clues.ToCore(err)) { continue } diff --git a/src/internal/m365/helper_test.go b/src/internal/m365/helper_test.go index aee2c11bb..6f3907394 100644 --- a/src/internal/m365/helper_test.go +++ b/src/internal/m365/helper_test.go @@ -16,6 +16,7 @@ import ( "golang.org/x/exp/slices" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" odStub "github.com/alcionai/corso/src/internal/m365/service/onedrive/stub" @@ -573,7 +574,12 @@ func compareExchangeEmail( expected map[string][]byte, item data.Item, ) { - itemData, err := io.ReadAll(item.ToReader()) + rr := versionedReadWrapper(t, item.ToReader()) + if rr == nil { + return + } + + itemData, err := io.ReadAll(rr) if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { return } @@ -600,7 +606,12 @@ func compareExchangeContact( expected map[string][]byte, item data.Item, ) { - itemData, err := io.ReadAll(item.ToReader()) + rr := versionedReadWrapper(t, item.ToReader()) + if rr == nil { + return + } + + itemData, err := io.ReadAll(rr) if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { return } @@ -628,7 +639,12 @@ func compareExchangeEvent( expected map[string][]byte, item data.Item, ) { - itemData, err := io.ReadAll(item.ToReader()) + rr := versionedReadWrapper(t, item.ToReader()) + if rr == nil { + return + } + + itemData, err := io.ReadAll(rr) if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { return } @@ -718,7 +734,12 @@ func compareDriveItem( return false } - buf, err := io.ReadAll(item.ToReader()) + rr := versionedReadWrapper(t, item.ToReader()) + if rr == nil { + return true + } + + buf, err := io.ReadAll(rr) if !assert.NoError(t, err, clues.ToCore(err)) { return true } @@ -850,6 +871,29 @@ func compareDriveItem( return true } +// versionedReaderWrapper strips out the version format header and checks it +// meets the current standard for all service types. If it doesn't meet the +// standard, returns nil. Else returns the versionedRestoreReader. +func versionedReadWrapper( + t *testing.T, + reader io.ReadCloser, +) io.ReadCloser { + rr, err := readers.NewVersionedRestoreReader(reader) + if !assert.NoError(t, err, clues.ToCore(err)) { + return nil + } + + if !assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) { + return nil + } + + if !assert.False(t, rr.Format().DelInFlight) { + return nil + } + + return rr +} + // compareItem compares the data returned by backup with the expected data. // Returns true if a comparison was done else false. Bool return is mostly used // to exclude OneDrive permissions for the root right now. diff --git a/src/internal/m365/service/sharepoint/api/pages_test.go b/src/internal/m365/service/sharepoint/api/pages_test.go index f462805d2..792e3eda0 100644 --- a/src/internal/m365/service/sharepoint/api/pages_test.go +++ b/src/internal/m365/service/sharepoint/api/pages_test.go @@ -109,10 +109,11 @@ func (suite *SharePointPageSuite) TestRestoreSinglePage() { //nolint:lll byteArray := spMock.Page("Byte Test") - pageData := data.NewUnindexedPrefetchedItem( + pageData, err := data.NewUnindexedPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), testName, time.Now()) + require.NoError(t, err, clues.ToCore(err)) info, err := api.RestoreSitePage( ctx, diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index eb5673196..9246a9325 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -182,12 +182,17 @@ func collect( return nil, clues.Wrap(err, "marshalling body").WithClues(ctx) } + item, err := data.NewUnindexedPrefetchedItem( + io.NopCloser(bytes.NewReader(bs)), + col.itemName, + time.Now()) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + dc := streamCollection{ folderPath: p, - item: data.NewUnindexedPrefetchedItem( - io.NopCloser(bytes.NewReader(bs)), - col.itemName, - time.Now()), + item: item, } return &dc, nil