From f0ccf35b5c8b1ec4f7085ae2a16816ead6d0da47 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Fri, 29 Sep 2023 13:16:01 -0700 Subject: [PATCH] Inject versions in generic item (#4409) Leverage the generic item struct to inject serialization format information for all items Unwires the old code that injected versions in kopia wrapper but leaves some code in the wrapper to strip out the serialization format during restore Future PRs should move the process of pulling out serialization format to individual services Viewing by commit may make review easier --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * #4328 #### Test Plan - [x] :muscle: Manual - [x] :zap: Unit test - [x] :green_heart: E2E --- .../common/readers/serialization_version.go | 10 +- src/internal/data/item.go | 65 ++++--- src/internal/data/item_test.go | 71 +++++-- src/internal/kopia/data_collection.go | 37 +++- src/internal/kopia/data_collection_test.go | 75 +++++--- src/internal/kopia/merge_collection_test.go | 63 ++++-- src/internal/kopia/upload.go | 6 +- src/internal/kopia/upload_test.go | 50 ++--- src/internal/kopia/wrapper.go | 5 +- src/internal/kopia/wrapper_test.go | 30 +-- .../m365/collection/drive/collection.go | 17 +- .../m365/collection/drive/collection_test.go | 27 ++- .../m365/collection/drive/collections_test.go | 13 +- .../m365/collection/exchange/backup_test.go | 32 ++- .../m365/collection/exchange/collection.go | 13 +- .../collection/exchange/collection_test.go | 182 ++++++++++++------ .../m365/collection/groups/collection.go | 28 ++- .../m365/collection/groups/collection_test.go | 12 +- .../m365/collection/site/collection.go | 16 +- .../m365/collection/site/collection_test.go | 9 +- .../m365/graph/metadata_collection.go | 13 +- .../m365/graph/metadata_collection_test.go | 30 ++- src/internal/m365/helper_test.go | 52 ++++- .../m365/service/sharepoint/api/pages_test.go | 3 +- src/internal/streamstore/streamstore.go | 13 +- 25 files changed, 606 insertions(+), 266 deletions(-) diff --git a/src/internal/common/readers/serialization_version.go b/src/internal/common/readers/serialization_version.go index a6713f959..f203c3233 100644 --- a/src/internal/common/readers/serialization_version.go +++ b/src/internal/common/readers/serialization_version.go @@ -38,8 +38,8 @@ type SerializationVersion uint16 const DefaultSerializationVersion SerializationVersion = 1 const ( - versionFormatSize = int(unsafe.Sizeof(persistedSerializationVersion(0))) - delInFlightMask persistedSerializationVersion = 1 << ((versionFormatSize * 8) - 1) + VersionFormatSize = int(unsafe.Sizeof(persistedSerializationVersion(0))) + delInFlightMask persistedSerializationVersion = 1 << ((VersionFormatSize * 8) - 1) ) // SerializationFormat is a struct describing serialization format versions and @@ -68,7 +68,7 @@ func NewVersionedBackupReader( formattedVersion |= delInFlightMask } - formattedBuf := make([]byte, versionFormatSize) + formattedBuf := make([]byte, VersionFormatSize) binary.BigEndian.PutUint32(formattedBuf, formattedVersion) versionReader := io.NopCloser(bytes.NewReader(formattedBuf)) @@ -139,10 +139,10 @@ func (vbr *versionedBackupReader) Close() error { func NewVersionedRestoreReader( baseReader io.ReadCloser, ) (*VersionedRestoreReader, error) { - versionBuf := make([]byte, versionFormatSize) + versionBuf := make([]byte, VersionFormatSize) // Loop to account for the unlikely case where we get a short read. - for read := 0; read < versionFormatSize; { + for read := 0; read < VersionFormatSize; { n, err := baseReader.Read(versionBuf[read:]) if err != nil { return nil, clues.Wrap(err, "reading serialization version") diff --git a/src/internal/data/item.go b/src/internal/data/item.go index 6d316ad6b..c6cb064e7 100644 --- a/src/internal/data/item.go +++ b/src/internal/data/item.go @@ -1,7 +1,6 @@ package data import ( - "bytes" "context" "io" "sync" @@ -10,6 +9,7 @@ import ( "github.com/alcionai/clues" "github.com/spatialcurrent/go-lazy/pkg/lazy" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" @@ -46,12 +46,19 @@ func NewUnindexedPrefetchedItem( reader io.ReadCloser, itemID string, modTime time.Time, -) Item { +) (*unindexedPrefetchedItem, error) { + r, err := readers.NewVersionedBackupReader( + readers.SerializationFormat{Version: readers.DefaultSerializationVersion}, + reader) + if err != nil { + return nil, clues.Stack(err) + } + return &unindexedPrefetchedItem{ id: itemID, - reader: reader, + reader: r, modTime: modTime, - } + }, nil } // unindexedPrefetchedItem represents a single item retrieved from the remote @@ -92,15 +99,16 @@ func NewPrefetchedItem( reader io.ReadCloser, itemID string, info details.ItemInfo, -) Item { - return &prefetchedItem{ - unindexedPrefetchedItem: unindexedPrefetchedItem{ - id: itemID, - reader: reader, - modTime: info.Modified(), - }, - info: info, +) (*prefetchedItem, error) { + inner, err := NewUnindexedPrefetchedItem(reader, itemID, info.Modified()) + if err != nil { + return nil, clues.Stack(err) } + + return &prefetchedItem{ + unindexedPrefetchedItem: inner, + info: info, + }, nil } // prefetchedItem represents a single item retrieved from the remote service. @@ -108,7 +116,7 @@ func NewPrefetchedItem( // This item implements ItemInfo so it should be used for things that need to // appear in backup details. type prefetchedItem struct { - unindexedPrefetchedItem + *unindexedPrefetchedItem info details.ItemInfo } @@ -129,7 +137,7 @@ func NewUnindexedLazyItem( itemID string, modTime time.Time, errs *fault.Bus, -) Item { +) *unindexedLazyItem { return &unindexedLazyItem{ ctx: ctx, id: itemID, @@ -182,6 +190,10 @@ func (i *unindexedLazyItem) ToReader() io.ReadCloser { return nil, clues.Stack(err) } + format := readers.SerializationFormat{ + Version: readers.DefaultSerializationVersion, + } + // If an item was deleted then return an empty file so we don't fail the // backup and return a sentinel error when asked for ItemInfo so we don't // display the item in the backup. @@ -193,13 +205,17 @@ func (i *unindexedLazyItem) ToReader() io.ReadCloser { logger.Ctx(i.ctx).Info("item not found") i.delInFlight = true + format.DelInFlight = true + r, err := readers.NewVersionedBackupReader(format) - return io.NopCloser(bytes.NewReader([]byte{})), nil + return r, clues.Stack(err).OrNil() } i.info = info - return reader, nil + r, err := readers.NewVersionedBackupReader(format, reader) + + return r, clues.Stack(err).OrNil() }) } @@ -217,15 +233,14 @@ func NewLazyItem( itemID string, modTime time.Time, errs *fault.Bus, -) Item { +) *lazyItem { return &lazyItem{ - unindexedLazyItem: unindexedLazyItem{ - ctx: ctx, - id: itemID, - itemGetter: itemGetter, - modTime: modTime, - errs: errs, - }, + unindexedLazyItem: NewUnindexedLazyItem( + ctx, + itemGetter, + itemID, + modTime, + errs), } } @@ -236,7 +251,7 @@ func NewLazyItem( // This item implements ItemInfo so it should be used for things that need to // appear in backup details. type lazyItem struct { - unindexedLazyItem + *unindexedLazyItem } func (i *lazyItem) Info() (details.ItemInfo, error) { diff --git a/src/internal/data/item_test.go b/src/internal/data/item_test.go index 9484613e4..f0c7e9009 100644 --- a/src/internal/data/item_test.go +++ b/src/internal/data/item_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" @@ -50,11 +51,15 @@ func TestItemUnitSuite(t *testing.T) { } func (suite *ItemUnitSuite) TestUnindexedPrefetchedItem() { - prefetch := data.NewUnindexedPrefetchedItem( + prefetch, err := data.NewUnindexedPrefetchedItem( io.NopCloser(bytes.NewReader([]byte{})), "foo", time.Time{}) - _, ok := prefetch.(data.ItemInfo) + require.NoError(suite.T(), err, clues.ToCore(err)) + + var item data.Item = prefetch + + _, ok := item.(data.ItemInfo) assert.False(suite.T(), ok, "unindexedPrefetchedItem implements Info()") } @@ -70,7 +75,10 @@ func (suite *ItemUnitSuite) TestUnindexedLazyItem() { "foo", time.Time{}, fault.New(true)) - _, ok := lazy.(data.ItemInfo) + + var item data.Item = lazy + + _, ok := item.(data.ItemInfo) assert.False(t, ok, "unindexedLazyItem implements Info()") } @@ -140,18 +148,29 @@ func (suite *ItemUnitSuite) TestPrefetchedItem() { suite.Run(test.name, func() { t := suite.T() - item := data.NewPrefetchedItem(test.reader, id, test.info) + item, err := data.NewPrefetchedItem(test.reader, id, test.info) + require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, id, item.ID(), "ID") assert.False(t, item.Deleted(), "deleted") assert.Equal( t, test.info.Modified(), - item.(data.ItemModTime).ModTime(), + item.ModTime(), "mod time") - readData, err := io.ReadAll(item.ToReader()) - test.readErr(t, err, clues.ToCore(err), "read error") + r, err := readers.NewVersionedRestoreReader(item.ToReader()) + require.NoError(t, err, "version error: %v", clues.ToCore(err)) + + if err != nil { + return + } + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + test.readErr(t, err, "read error: %v", clues.ToCore(err)) assert.Equal(t, test.expectData, readData, "read data") }) } @@ -194,6 +213,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { table := []struct { name string mid *mockItemDataGetter + versionErr assert.ErrorAssertionFunc readErr assert.ErrorAssertionFunc infoErr assert.ErrorAssertionFunc expectData []byte @@ -205,6 +225,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { reader: io.NopCloser(bytes.NewReader([]byte{})), info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, }, + versionErr: assert.NoError, readErr: assert.NoError, infoErr: assert.NoError, expectData: []byte{}, @@ -215,6 +236,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { reader: io.NopCloser(bytes.NewReader(baseData)), info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, }, + versionErr: assert.NoError, readErr: assert.NoError, infoErr: assert.NoError, expectData: baseData, @@ -225,6 +247,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { reader: io.NopCloser(bytes.NewReader(baseData)), info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, }, + versionErr: assert.NoError, readErr: assert.NoError, infoErr: assert.NoError, expectData: baseData, @@ -234,6 +257,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { mid: &mockItemDataGetter{ err: assert.AnError, }, + versionErr: assert.Error, readErr: assert.Error, infoErr: assert.Error, expectData: []byte{}, @@ -249,6 +273,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { }, info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, }, + versionErr: assert.NoError, readErr: assert.Error, infoErr: assert.NoError, expectData: baseData[:5], @@ -278,15 +303,25 @@ func (suite *ItemUnitSuite) TestLazyItem() { assert.Equal( t, now, - item.(data.ItemModTime).ModTime(), + item.ModTime(), "mod time") // Read data to execute lazy reader. - readData, err := io.ReadAll(item.ToReader()) + r, err := readers.NewVersionedRestoreReader(item.ToReader()) + test.versionErr(t, err, "version error: %v", clues.ToCore(err)) + + if err != nil { + return + } + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) test.readErr(t, err, clues.ToCore(err), "read error") assert.Equal(t, test.expectData, readData, "read data") - _, err = item.(data.ItemInfo).Info() + _, err = item.Info() test.infoErr(t, err, "Info(): %v", clues.ToCore(err)) e := errs.Errors() @@ -326,15 +361,21 @@ func (suite *ItemUnitSuite) TestLazyItem_DeletedInFlight() { assert.Equal( t, now, - item.(data.ItemModTime).ModTime(), + item.ModTime(), "mod time") // Read data to execute lazy reader. - readData, err := io.ReadAll(item.ToReader()) + r, err := readers.NewVersionedRestoreReader(item.ToReader()) + require.NoError(t, err, "version error: %v", clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.True(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) require.NoError(t, err, clues.ToCore(err), "read error") assert.Empty(t, readData, "read data") - _, err = item.(data.ItemInfo).Info() + _, err = item.Info() assert.ErrorIs(t, err, data.ErrNotFound, "Info() error") e := errs.Errors() @@ -366,9 +407,9 @@ func (suite *ItemUnitSuite) TestLazyItem_InfoBeforeReadErrors() { assert.Equal( t, now, - item.(data.ItemModTime).ModTime(), + item.ModTime(), "mod time") - _, err := item.(data.ItemInfo).Info() + _, err := item.Info() assert.Error(t, err, "Info() error") } diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index 319914f1a..c5899afdf 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/clues" "github.com/kopia/kopia/fs" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" @@ -16,6 +17,7 @@ import ( var ( _ data.RestoreCollection = &kopiaDataCollection{} _ data.Item = &kopiaDataStream{} + _ data.ItemSize = &kopiaDataStream{} ) type kopiaDataCollection struct { @@ -23,7 +25,7 @@ type kopiaDataCollection struct { dir fs.Directory items []string counter ByteCounter - expectedVersion uint32 + expectedVersion readers.SerializationVersion } func (kdc *kopiaDataCollection) Items( @@ -102,7 +104,7 @@ func (kdc kopiaDataCollection) FetchItemByName( return nil, clues.New("object is not a file").WithClues(ctx) } - size := f.Size() - int64(versionSize) + size := f.Size() - int64(readers.VersionFormatSize) if size < 0 { logger.Ctx(ctx).Infow("negative file size; resetting to 0", "file_size", size) @@ -118,13 +120,32 @@ func (kdc kopiaDataCollection) FetchItemByName( return nil, clues.Wrap(err, "opening file").WithClues(ctx) } + // TODO(ashmrtn): Remove this when individual services implement checks for + // version and deleted items. + rr, err := readers.NewVersionedRestoreReader(r) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + + if rr.Format().Version != kdc.expectedVersion { + return nil, clues.New("unexpected data format"). + WithClues(ctx). + With( + "read_version", rr.Format().Version, + "expected_version", kdc.expectedVersion) + } + + // This is a conservative check, but we shouldn't be seeing items that were + // deleted in flight during restores because there's no way to select them. + if rr.Format().DelInFlight { + return nil, clues.New("selected item marked as deleted in flight"). + WithClues(ctx) + } + return &kopiaDataStream{ - id: name, - reader: &restoreStreamReader{ - ReadCloser: r, - expectedVersion: kdc.expectedVersion, - }, - size: size, + id: name, + reader: rr, + size: size, }, nil } diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go index d587730ca..4b1b4a4b2 100644 --- a/src/internal/kopia/data_collection_test.go +++ b/src/internal/kopia/data_collection_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" dataMock "github.com/alcionai/corso/src/internal/data/mock" "github.com/alcionai/corso/src/internal/tester" @@ -121,25 +122,35 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { ) // Needs to be a function so the readers get refreshed each time. - getLayout := func() fs.Directory { + getLayout := func(t *testing.T) fs.Directory { + format := readers.SerializationFormat{ + Version: readers.DefaultSerializationVersion, + } + + r1, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(files[0].data))) + require.NoError(t, err, clues.ToCore(err)) + + r2, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(files[1].data))) + require.NoError(t, err, clues.ToCore(err)) + return virtualfs.NewStaticDirectory(encodeAsPath("foo"), []fs.Entry{ &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(files[0].uuid), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(files[0].data))), - size: int64(len(files[0].data) + versionSize), + r: r1, + size: int64(len(files[0].data) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(files[1].uuid), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(files[1].data))), - size: int64(len(files[1].data) + versionSize), + r: r2, + size: int64(len(files[1].data) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( @@ -224,10 +235,10 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { } c := kopiaDataCollection{ - dir: getLayout(), + dir: getLayout(t), path: nil, items: items, - expectedVersion: serializationVersion, + expectedVersion: readers.DefaultSerializationVersion, } var ( @@ -291,23 +302,34 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { // Needs to be a function so we can switch the serialization version as // needed. - getLayout := func(serVersion uint32) fs.Directory { + getLayout := func( + t *testing.T, + serVersion readers.SerializationVersion, + ) fs.Directory { + format := readers.SerializationFormat{Version: serVersion} + + r1, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader([]byte(noErrFileData)))) + require.NoError(t, err, clues.ToCore(err)) + + r2, err := readers.NewVersionedBackupReader( + format, + errReader.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + return virtualfs.NewStaticDirectory(encodeAsPath(folder2), []fs.Entry{ &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(noErrFileName), nil), - r: newBackupStreamReader( - serVersion, - io.NopCloser(bytes.NewReader([]byte(noErrFileData)))), + r: r1, }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(errFileName), nil), - r: newBackupStreamReader( - serVersion, - errReader.ToReader()), + r: r2, }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( @@ -330,7 +352,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { table := []struct { name string inputName string - inputSerializationVersion uint32 + inputSerializationVersion readers.SerializationVersion expectedData []byte lookupErr assert.ErrorAssertionFunc readErr assert.ErrorAssertionFunc @@ -339,7 +361,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { { name: "FileFound_NoError", inputName: noErrFileName, - inputSerializationVersion: serializationVersion, + inputSerializationVersion: readers.DefaultSerializationVersion, expectedData: []byte(noErrFileData), lookupErr: assert.NoError, readErr: assert.NoError, @@ -347,21 +369,20 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { { name: "FileFound_ReadError", inputName: errFileName, - inputSerializationVersion: serializationVersion, + inputSerializationVersion: readers.DefaultSerializationVersion, lookupErr: assert.NoError, readErr: assert.Error, }, { name: "FileFound_VersionError", inputName: noErrFileName, - inputSerializationVersion: serializationVersion + 1, - lookupErr: assert.NoError, - readErr: assert.Error, + inputSerializationVersion: readers.DefaultSerializationVersion + 1, + lookupErr: assert.Error, }, { name: "FileNotFound", inputName: "foo", - inputSerializationVersion: serializationVersion + 1, + inputSerializationVersion: readers.DefaultSerializationVersion + 1, lookupErr: assert.Error, notFoundErr: true, }, @@ -373,14 +394,14 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { ctx, flush := tester.NewContext(t) defer flush() - root := getLayout(test.inputSerializationVersion) + root := getLayout(t, test.inputSerializationVersion) c := &i64counter{} col := &kopiaDataCollection{ path: pth, dir: root, counter: c, - expectedVersion: serializationVersion, + expectedVersion: readers.DefaultSerializationVersion, } s, err := col.FetchItemByName(ctx, test.inputName) diff --git a/src/internal/kopia/merge_collection_test.go b/src/internal/kopia/merge_collection_test.go index f89c2dd95..fefbfbb15 100644 --- a/src/internal/kopia/merge_collection_test.go +++ b/src/internal/kopia/merge_collection_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/service/exchange/mock" "github.com/alcionai/corso/src/internal/tester" @@ -150,20 +151,27 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() { require.NoError(suite.T(), err, clues.ToCore(err)) // Needs to be a function so the readers get refreshed each time. - layouts := []func() fs.Directory{ + layouts := []func(t *testing.T) fs.Directory{ // Has the following; // - file1: data[0] // - errOpen: (error opening file) - func() fs.Directory { + func(t *testing.T) fs.Directory { + format := readers.SerializationFormat{ + Version: readers.DefaultSerializationVersion, + } + + r1, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(fileData1))) + require.NoError(t, err, clues.ToCore(err)) + return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[0]), []fs.Entry{ &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(fileName1), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData1))), - size: int64(len(fileData1) + versionSize), + r: r1, + size: int64(len(fileData1) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( @@ -178,34 +186,47 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() { // - file1: data[1] // - file2: data[0] // - errOpen: data[2] - func() fs.Directory { + func(t *testing.T) fs.Directory { + format := readers.SerializationFormat{ + Version: readers.DefaultSerializationVersion, + } + + r1, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(fileData2))) + require.NoError(t, err, clues.ToCore(err)) + + r2, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(fileData1))) + require.NoError(t, err, clues.ToCore(err)) + + r3, err := readers.NewVersionedBackupReader( + format, + io.NopCloser(bytes.NewReader(fileData3))) + require.NoError(t, err, clues.ToCore(err)) + return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[1]), []fs.Entry{ &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(fileName1), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData2))), - size: int64(len(fileData2) + versionSize), + r: r1, + size: int64(len(fileData2) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(fileName2), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData1))), - size: int64(len(fileData1) + versionSize), + r: r2, + size: int64(len(fileData1) + readers.VersionFormatSize), }, &mockFile{ StreamingFile: virtualfs.StreamingFileFromReader( encodeAsPath(fileOpenErrName), nil), - r: newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData3))), - size: int64(len(fileData3) + versionSize), + r: r3, + size: int64(len(fileData3) + readers.VersionFormatSize), }, }) }, @@ -257,9 +278,9 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() { for i, layout := range layouts { col := &kopiaDataCollection{ path: pth, - dir: layout(), + dir: layout(t), counter: c, - expectedVersion: serializationVersion, + expectedVersion: readers.DefaultSerializationVersion, } err := dc.addCollection(colPaths[i], col) diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index bc7a1f034..c1e1351e5 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -13,7 +13,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "github.com/alcionai/clues" "github.com/kopia/kopia/fs" @@ -24,6 +23,7 @@ import ( "github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/m365/graph" @@ -37,7 +37,7 @@ import ( const maxInflateTraversalDepth = 500 -var versionSize = int(unsafe.Sizeof(serializationVersion)) +var versionSize = readers.VersionFormatSize func newBackupStreamReader(version uint32, reader io.ReadCloser) *backupStreamReader { buf := make([]byte, versionSize) @@ -436,7 +436,7 @@ func collectionEntries( entry := virtualfs.StreamingFileWithModTimeFromReader( encodedName, modTime, - newBackupStreamReader(serializationVersion, e.ToReader())) + e.ToReader()) err = ctr(ctx, entry) if err != nil { diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index fd74cd9fa..c88da8af0 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -124,12 +124,6 @@ func expectFileData( return } - // Need to wrap with a restore stream reader to remove the version. - r = &restoreStreamReader{ - ReadCloser: io.NopCloser(r), - expectedVersion: serializationVersion, - } - got, err := io.ReadAll(r) if !assert.NoError(t, err, "reading data in file", name, clues.ToCore(err)) { return @@ -2420,9 +2414,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsCorrectSubt encodeElements(inboxFileName1)[0], time.Time{}, // Wrap with a backup reader so it gets the version injected. - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(inboxFileData1v2)))), + io.NopCloser(bytes.NewReader(inboxFileData1v2))), }), }), virtualfs.NewStaticDirectory( @@ -2582,9 +2574,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsMigrateSubt virtualfs.StreamingFileWithModTimeFromReader( encodeElements(inboxFileName1)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(inboxFileData1)))), + io.NopCloser(bytes.NewReader(inboxFileData1))), }), }), virtualfs.NewStaticDirectory( @@ -2596,9 +2586,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsMigrateSubt virtualfs.StreamingFileWithModTimeFromReader( encodeElements(contactsFileName1)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(contactsFileData1)))), + io.NopCloser(bytes.NewReader(contactsFileData1))), }), }), }) @@ -2817,15 +2805,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName5)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData5)))), + io.NopCloser(bytes.NewReader(fileData5))), virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName6)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData6)))), + io.NopCloser(bytes.NewReader(fileData6))), }) counters[folderID3] = count @@ -2835,15 +2819,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName3)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData3)))), + io.NopCloser(bytes.NewReader(fileData3))), virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName4)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData4)))), + io.NopCloser(bytes.NewReader(fileData4))), folder, }) counters[folderID2] = count @@ -2859,15 +2839,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName1)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData1)))), + io.NopCloser(bytes.NewReader(fileData1))), virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName2)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData2)))), + io.NopCloser(bytes.NewReader(fileData2))), folder, folder4, }) @@ -2879,15 +2855,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName7)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData7)))), + io.NopCloser(bytes.NewReader(fileData7))), virtualfs.StreamingFileWithModTimeFromReader( encodeElements(fileName8)[0], time.Time{}, - newBackupStreamReader( - serializationVersion, - io.NopCloser(bytes.NewReader(fileData8)))), + io.NopCloser(bytes.NewReader(fileData8))), }) counters[folderID5] = count diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 24e0708b5..10523de6c 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -18,6 +18,7 @@ import ( "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/prefixmatcher" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/observe" @@ -36,8 +37,6 @@ const ( // possibly corresponding to who is making the backup. corsoHost = "corso-host" corsoUser = "corso" - - serializationVersion uint32 = 1 ) // common manifest tags @@ -447,7 +446,7 @@ func loadDirsAndItems( dir: dir, items: dirItems.items, counter: bcounter, - expectedVersion: serializationVersion, + expectedVersion: readers.DefaultSerializationVersion, } if err := mergeCol.addCollection(dirItems.dir.String(), dc); err != nil { diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 452bc4ffa..77721fc7b 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -1268,7 +1268,10 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { ctx, []identity.Reasoner{r}, nil, - []data.BackupCollection{dc1, dc2}, + []data.BackupCollection{ + dataMock.NewVersionedBackupCollection(t, dc1), + dataMock.NewVersionedBackupCollection(t, dc2), + }, nil, nil, true, @@ -1577,12 +1580,15 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { }) } - collections = append(collections, collection) + collections = append( + collections, + dataMock.NewVersionedBackupCollection(t, collection)) } r := NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory) - stats, deets, _, err := suite.w.ConsumeBackupCollections( + // Other tests check basic things about deets so not doing that again here. + stats, _, _, err := suite.w.ConsumeBackupCollections( suite.ctx, []identity.Reasoner{r}, nil, @@ -1597,8 +1603,6 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { require.Equal(t, stats.TotalDirectoryCount, expectedDirs) require.Equal(t, stats.IgnoredErrorCount, 0) require.False(t, stats.Incomplete) - // 6 file and 2 folder entries. - assert.Len(t, deets.Details().Entries, expectedFiles+2) suite.snapshotID = manifest.ID(stats.SnapshotID) } @@ -1629,7 +1633,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excludePrefix bool expectedCachedItems int expectedUncachedItems int - cols func() []data.BackupCollection + cols func(t *testing.T) []data.BackupCollection backupIDCheck require.ValueAssertionFunc restoreCheck assert.ErrorAssertionFunc }{ @@ -1638,7 +1642,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excludeItem: true, expectedCachedItems: len(suite.filesByPath) - 1, expectedUncachedItems: 0, - cols: func() []data.BackupCollection { + cols: func(t *testing.T) []data.BackupCollection { return nil }, backupIDCheck: require.NotEmpty, @@ -1650,7 +1654,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excludePrefix: true, expectedCachedItems: len(suite.filesByPath) - 1, expectedUncachedItems: 0, - cols: func() []data.BackupCollection { + cols: func(t *testing.T) []data.BackupCollection { return nil }, backupIDCheck: require.NotEmpty, @@ -1661,7 +1665,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { // No snapshot should be made since there were no changes. expectedCachedItems: 0, expectedUncachedItems: 0, - cols: func() []data.BackupCollection { + cols: func(t *testing.T) []data.BackupCollection { return nil }, // Backup doesn't run. @@ -1671,7 +1675,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { name: "NoExcludeItemWithChanges", expectedCachedItems: len(suite.filesByPath), expectedUncachedItems: 1, - cols: func() []data.BackupCollection { + cols: func(t *testing.T) []data.BackupCollection { c := exchMock.NewCollection( suite.testPath1, suite.testPath1, @@ -1679,7 +1683,9 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { c.ColState = data.NotMovedState c.PrevPath = suite.testPath1 - return []data.BackupCollection{c} + return []data.BackupCollection{ + dataMock.NewVersionedBackupCollection(t, c), + } }, backupIDCheck: require.NotEmpty, restoreCheck: assert.NoError, @@ -1717,7 +1723,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { Manifest: man, Reasons: []identity.Reasoner{r}, }), - test.cols(), + test.cols(t), excluded, nil, true, diff --git a/src/internal/m365/collection/drive/collection.go b/src/internal/m365/collection/drive/collection.go index 19de8e0dc..423c43930 100644 --- a/src/internal/m365/collection/drive/collection.go +++ b/src/internal/m365/collection/drive/collection.go @@ -584,15 +584,24 @@ func (oc *Collection) streamDriveItem( return progReader, nil }) - // We wrap the reader with a lazy reader so that the progress bar is only - // initialized if the file is read. Since we're not actually lazily reading - // data just use the eager item implementation. - oc.data <- data.NewUnindexedPrefetchedItem( + storeItem, err := data.NewUnindexedPrefetchedItem( metaReader, metaFileName+metaSuffix, // Metadata file should always use the latest time as // permissions change does not update mod time. time.Now()) + if err != nil { + errs.AddRecoverable(ctx, clues.Stack(err). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation)) + + return + } + + // We wrap the reader with a lazy reader so that the progress bar is only + // initialized if the file is read. Since we're not actually lazily reading + // data just use the eager item implementation. + oc.data <- storeItem // Item read successfully, add to collection if isFile { diff --git a/src/internal/m365/collection/drive/collection_test.go b/src/internal/m365/collection/drive/collection_test.go index b99a2484e..2e2f85160 100644 --- a/src/internal/m365/collection/drive/collection_test.go +++ b/src/internal/m365/collection/drive/collection_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" metaTD "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata/testdata" @@ -256,7 +257,7 @@ func (suite *CollectionUnitSuite) TestCollection() { mt := readItem.(data.ItemModTime) assert.Equal(t, now, mt.ModTime()) - readData, err := io.ReadAll(readItem.ToReader()) + rr, err := readers.NewVersionedRestoreReader(readItem.ToReader()) test.expectErr(t, err) if err != nil { @@ -267,13 +268,25 @@ func (suite *CollectionUnitSuite) TestCollection() { return } + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + + readData, err := io.ReadAll(rr) + require.NoError(t, err, clues.ToCore(err)) + assert.Equal(t, stubItemContent, readData) readItemMeta := readItems[1] assert.Equal(t, stubItemID+metadata.MetaFileSuffix, readItemMeta.ID()) + rr, err = readers.NewVersionedRestoreReader(readItemMeta.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + readMeta := metadata.Metadata{} - err = json.NewDecoder(readItemMeta.ToReader()).Decode(&readMeta) + err = json.NewDecoder(rr).Decode(&readMeta) require.NoError(t, err, clues.ToCore(err)) metaTD.AssertMetadataEqual(t, stubMeta, readMeta) @@ -485,12 +498,18 @@ func (suite *CollectionUnitSuite) TestCollectionPermissionBackupLatestModTime() for _, i := range readItems { if strings.HasSuffix(i.ID(), metadata.MetaFileSuffix) { - content, err := io.ReadAll(i.ToReader()) + rr, err := readers.NewVersionedRestoreReader(i.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + + content, err := io.ReadAll(rr) require.NoError(t, err, clues.ToCore(err)) require.Equal(t, `{"filename":"Fake Item","permissionMode":1}`, string(content)) im, ok := i.(data.ItemModTime) - require.Equal(t, ok, true, "modtime interface") + require.True(t, ok, "modtime interface") require.Greater(t, im.ModTime(), mtime, "permissions time greater than mod time") } } diff --git a/src/internal/m365/collection/drive/collections_test.go b/src/internal/m365/collection/drive/collections_test.go index 1b50d074a..d0e33477f 100644 --- a/src/internal/m365/collection/drive/collections_test.go +++ b/src/internal/m365/collection/drive/collections_test.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/src/internal/common/prefixmatcher" pmMock "github.com/alcionai/corso/src/internal/common/prefixmatcher/mock" "github.com/alcionai/corso/src/internal/data" + dataMock "github.com/alcionai/corso/src/internal/data/mock" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" "github.com/alcionai/corso/src/internal/m365/graph" odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" @@ -1114,7 +1115,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestDeserializeMetadata() { func(*support.ControllerOperationStatus) {}) require.NoError(t, err, clues.ToCore(err)) - cols = append(cols, data.NoFetchRestoreCollection{Collection: mc}) + cols = append(cols, dataMock.NewUnversionedRestoreCollection( + t, + data.NoFetchRestoreCollection{Collection: mc})) } deltas, paths, canUsePreviousBackup, err := deserializeMetadata(ctx, cols) @@ -2211,7 +2214,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { func(*support.ControllerOperationStatus) {}) assert.NoError(t, err, "creating metadata collection", clues.ToCore(err)) - prevMetadata := []data.RestoreCollection{data.NoFetchRestoreCollection{Collection: mc}} + prevMetadata := []data.RestoreCollection{ + dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: mc}), + } errs := fault.New(true) delList := prefixmatcher.NewStringSetBuilder() @@ -2238,7 +2243,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() { deltas, paths, _, err := deserializeMetadata( ctx, []data.RestoreCollection{ - data.NoFetchRestoreCollection{Collection: baseCol}, + dataMock.NewUnversionedRestoreCollection( + t, + data.NoFetchRestoreCollection{Collection: baseCol}), }) if !assert.NoError(t, err, "deserializing metadata", clues.ToCore(err)) { continue diff --git a/src/internal/m365/collection/exchange/backup_test.go b/src/internal/m365/collection/exchange/backup_test.go index 4b046fd47..bb6aad27c 100644 --- a/src/internal/m365/collection/exchange/backup_test.go +++ b/src/internal/m365/collection/exchange/backup_test.go @@ -15,7 +15,9 @@ import ( inMock "github.com/alcionai/corso/src/internal/common/idname/mock" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" + dataMock "github.com/alcionai/corso/src/internal/data/mock" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/operations/inject" @@ -322,7 +324,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { require.NoError(t, err, clues.ToCore(err)) cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ - data.NoFetchRestoreCollection{Collection: coll}, + dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: coll}), }) test.expectError(t, err, clues.ToCore(err)) @@ -591,7 +593,7 @@ func (suite *BackupIntgSuite) TestDelta() { require.NotNil(t, metadata, "collections contains a metadata collection") cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ - data.NoFetchRestoreCollection{Collection: metadata}, + dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: metadata}), }) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") @@ -666,7 +668,12 @@ func (suite *BackupIntgSuite) TestMailSerializationRegression() { for stream := range streamChannel { buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) + rr, err := readers.NewVersionedRestoreReader(stream.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + + read, err := buf.ReadFrom(rr) assert.NoError(t, err, clues.ToCore(err)) assert.NotZero(t, read) @@ -744,7 +751,13 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() { for stream := range edc.Items(ctx, fault.New(true)) { buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) + + rr, err := readers.NewVersionedRestoreReader(stream.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + + read, err := buf.ReadFrom(rr) assert.NoError(t, err, clues.ToCore(err)) assert.NotZero(t, read) @@ -878,7 +891,12 @@ func (suite *BackupIntgSuite) TestEventsSerializationRegression() { for item := range edc.Items(ctx, fault.New(true)) { buf := &bytes.Buffer{} - read, err := buf.ReadFrom(item.ToReader()) + rr, err := readers.NewVersionedRestoreReader(item.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + + read, err := buf.ReadFrom(rr) assert.NoError(t, err, clues.ToCore(err)) assert.NotZero(t, read) @@ -1198,7 +1216,9 @@ func checkMetadata( ) { catPaths, _, err := ParseMetadataCollections( ctx, - []data.RestoreCollection{data.NoFetchRestoreCollection{Collection: c}}) + []data.RestoreCollection{ + dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: c}), + }) if !assert.NoError(t, err, "getting metadata", clues.ToCore(err)) { return } diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index 30f28672d..71b9bb01b 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -278,10 +278,21 @@ func (col *prefetchCollection) streamItems( return } - stream <- data.NewPrefetchedItem( + item, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(itemData)), id, details.ItemInfo{Exchange: info}) + if err != nil { + el.AddRecoverable( + ctx, + clues.Stack(err). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation)) + + return + } + + stream <- item atomic.AddInt64(&success, 1) atomic.AddInt64(&totalBytes, info.Size) diff --git a/src/internal/m365/collection/exchange/collection_test.go b/src/internal/m365/collection/exchange/collection_test.go index 5e1665faa..f373bd1a5 100644 --- a/src/internal/m365/collection/exchange/collection_test.go +++ b/src/internal/m365/collection/exchange/collection_test.go @@ -17,6 +17,7 @@ import ( "golang.org/x/exp/slices" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/exchange/mock" "github.com/alcionai/corso/src/internal/m365/graph" @@ -55,13 +56,20 @@ func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() { suite.Run(test.name, func() { t := suite.T() - ed := data.NewPrefetchedItem( + ed, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(test.readData)), "itemID", details.ItemInfo{}) + require.NoError(t, err, clues.ToCore(err)) + + r, err := readers.NewVersionedRestoreReader(ed.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) buf := &bytes.Buffer{} - _, err := buf.ReadFrom(ed.ToReader()) + _, err = buf.ReadFrom(r) assert.NoError(t, err, "reading data: %v", clues.ToCore(err)) assert.Equal(t, test.readData, buf.Bytes(), "read data") assert.Equal(t, "itemID", ed.ID(), "item ID") @@ -493,11 +501,11 @@ func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() { time.Now(), fault.New(true)) - _, err := li.(data.ItemInfo).Info() + _, err := li.Info() assert.Error(suite.T(), err, "Info without reading data should error") } -func (suite *CollectionUnitSuite) TestLazyItem() { +func (suite *CollectionUnitSuite) TestLazyItem_GetDataErrors() { var ( parentPath = "inbox/private/silly cats" now = time.Now() @@ -505,44 +513,19 @@ func (suite *CollectionUnitSuite) TestLazyItem() { table := []struct { name string - modTime time.Time getErr error serializeErr error - expectModTime time.Time expectReadErrType error - dataCheck assert.ValueAssertionFunc - expectInfoErr bool - expectInfoErrType error }{ - { - name: "ReturnsEmptyReaderOnDeletedInFlight", - modTime: now, - getErr: graph.ErrDeletedInFlight, - dataCheck: assert.Empty, - expectInfoErr: true, - expectInfoErrType: data.ErrNotFound, - }, - { - name: "ReturnsValidReaderAndInfo", - modTime: now, - dataCheck: assert.NotEmpty, - expectModTime: now, - }, { name: "ReturnsErrorOnGenericGetError", - modTime: now, getErr: assert.AnError, expectReadErrType: assert.AnError, - dataCheck: assert.Empty, - expectInfoErr: true, }, { name: "ReturnsErrorOnGenericSerializeError", - modTime: now, serializeErr: assert.AnError, expectReadErrType: assert.AnError, - dataCheck: assert.Empty, - expectInfoErr: true, }, } @@ -575,47 +558,128 @@ func (suite *CollectionUnitSuite) TestLazyItem() { userID: "userID", itemID: "itemID", getter: getter, - modTime: test.modTime, + modTime: now, immutableIDs: false, parentPath: parentPath, }, "itemID", - test.modTime, + now, fault.New(true)) assert.False(t, li.Deleted(), "item shouldn't be marked deleted") - assert.Equal( - t, - test.modTime, - li.(data.ItemModTime).ModTime(), - "item mod time") + assert.Equal(t, now, li.ModTime(), "item mod time") - readData, err := io.ReadAll(li.ToReader()) - if test.expectReadErrType == nil { - assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) - } else { - assert.ErrorIs(t, err, test.expectReadErrType, "read error") - } - - test.dataCheck(t, readData, "read item data") - - info, err := li.(data.ItemInfo).Info() - - // Didn't expect an error getting info, it should be valid. - if !test.expectInfoErr { - assert.NoError(t, err, "getting item info: %v", clues.ToCore(err)) - assert.Equal(t, parentPath, info.Exchange.ParentPath) - assert.Equal(t, test.expectModTime, info.Modified()) - - return - } + _, err := readers.NewVersionedRestoreReader(li.ToReader()) + assert.ErrorIs(t, err, test.expectReadErrType) // Should get some form of error when trying to get info. + _, err = li.Info() assert.Error(t, err, "Info()") - - if test.expectInfoErrType != nil { - assert.ErrorIs(t, err, test.expectInfoErrType, "Info() error") - } }) } } + +func (suite *CollectionUnitSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlight() { + var ( + t = suite.T() + + parentPath = "inbox/private/silly cats" + now = time.Now() + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + getter := &mock.ItemGetSerialize{GetErr: graph.ErrDeletedInFlight} + + li := data.NewLazyItem( + ctx, + &lazyItemGetter{ + userID: "userID", + itemID: "itemID", + getter: getter, + modTime: now, + immutableIDs: false, + parentPath: parentPath, + }, + "itemID", + now, + fault.New(true)) + + assert.False(t, li.Deleted(), "item shouldn't be marked deleted") + assert.Equal( + t, + now, + li.ModTime(), + "item mod time") + + r, err := readers.NewVersionedRestoreReader(li.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.True(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) + + assert.Empty(t, readData, "read item data") + + _, err = li.Info() + assert.ErrorIs(t, err, data.ErrNotFound, "Info() error") +} + +func (suite *CollectionUnitSuite) TestLazyItem() { + var ( + t = suite.T() + + parentPath = "inbox/private/silly cats" + now = time.Now() + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + // Exact data type doesn't really matter. + testData := models.NewMessage() + testData.SetSubject(ptr.To("hello world")) + + getter := &mock.ItemGetSerialize{GetData: testData} + + li := data.NewLazyItem( + ctx, + &lazyItemGetter{ + userID: "userID", + itemID: "itemID", + getter: getter, + modTime: now, + immutableIDs: false, + parentPath: parentPath, + }, + "itemID", + now, + fault.New(true)) + + assert.False(t, li.Deleted(), "item shouldn't be marked deleted") + assert.Equal( + t, + now, + li.ModTime(), + "item mod time") + + r, err := readers.NewVersionedRestoreReader(li.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) + + assert.NotEmpty(t, readData, "read item data") + + info, err := li.Info() + assert.NoError(t, err, "getting item info: %v", clues.ToCore(err)) + + assert.Equal(t, parentPath, info.Exchange.ParentPath) + assert.Equal(t, now, info.Modified()) +} diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index b8ff3b436..0a1ca7212 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -150,27 +150,47 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { parentFolderID, id) if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "writing channel message to serializer")) + el.AddRecoverable( + ctx, + clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)) + return } if err := writer.WriteObjectValue("", item); err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "writing channel message to serializer")) + el.AddRecoverable( + ctx, + clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)) + return } itemData, err := writer.GetSerializedContent() if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "serializing channel message")) + el.AddRecoverable( + ctx, + clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)) + return } info.ParentPath = col.LocationPath().String() - col.stream <- data.NewPrefetchedItem( + storeItem, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(itemData)), id, details.ItemInfo{Groups: info}) + if err != nil { + el.AddRecoverable( + ctx, + clues.Stack(err). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation)) + + return + } + + col.stream <- storeItem atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&totalBytes, info.Size) diff --git a/src/internal/m365/collection/groups/collection_test.go b/src/internal/m365/collection/groups/collection_test.go index be4c52dc7..e0bf19d19 100644 --- a/src/internal/m365/collection/groups/collection_test.go +++ b/src/internal/m365/collection/groups/collection_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/groups/mock" "github.com/alcionai/corso/src/internal/m365/support" @@ -48,13 +49,20 @@ func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() { suite.Run(test.name, func() { t := suite.T() - ed := data.NewPrefetchedItem( + ed, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(test.readData)), "itemID", details.ItemInfo{}) + require.NoError(t, err, clues.ToCore(err)) + + r, err := readers.NewVersionedRestoreReader(ed.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) buf := &bytes.Buffer{} - _, err := buf.ReadFrom(ed.ToReader()) + _, err = buf.ReadFrom(r) assert.NoError(t, err, "reading data: %v", clues.ToCore(err)) assert.Equal(t, test.readData, buf.Bytes(), "read data") assert.Equal(t, "itemID", ed.ID(), "item ID") diff --git a/src/internal/m365/collection/site/collection.go b/src/internal/m365/collection/site/collection.go index 422ed4b2a..8af643d4b 100644 --- a/src/internal/m365/collection/site/collection.go +++ b/src/internal/m365/collection/site/collection.go @@ -211,11 +211,17 @@ func (sc *Collection) retrieveLists( metrics.Bytes += size metrics.Successes++ - sc.data <- data.NewPrefetchedItem( + + item, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), ptr.Val(lst.GetId()), details.ItemInfo{SharePoint: ListToSPInfo(lst, size)}) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + continue + } + sc.data <- item progress <- struct{}{} } } @@ -272,11 +278,17 @@ func (sc *Collection) retrievePages( if size > 0 { metrics.Bytes += size metrics.Successes++ - sc.data <- data.NewPrefetchedItem( + + item, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), ptr.Val(pg.GetId()), details.ItemInfo{SharePoint: pageToSPInfo(pg, root, size)}) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + continue + } + sc.data <- item progress <- struct{}{} } } diff --git a/src/internal/m365/collection/site/collection_test.go b/src/internal/m365/collection/site/collection_test.go index 3d0336217..5b53513f0 100644 --- a/src/internal/m365/collection/site/collection_test.go +++ b/src/internal/m365/collection/site/collection_test.go @@ -103,10 +103,11 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { byteArray, err := ow.GetSerializedContent() require.NoError(t, err, clues.ToCore(err)) - data := data.NewPrefetchedItem( + data, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), name, details.ItemInfo{SharePoint: ListToSPInfo(listing, int64(len(byteArray)))}) + require.NoError(t, err, clues.ToCore(err)) return data }, @@ -132,10 +133,11 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { page, err := betaAPI.CreatePageFromBytes(byteArray) require.NoError(t, err, clues.ToCore(err)) - data := data.NewPrefetchedItem( + data, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), itemName, details.ItemInfo{SharePoint: betaAPI.PageInfo(page, int64(len(byteArray)))}) + require.NoError(t, err, clues.ToCore(err)) return data }, @@ -194,10 +196,11 @@ func (suite *SharePointCollectionSuite) TestListCollection_Restore() { byteArray, err := service.Serialize(listing) require.NoError(t, err, clues.ToCore(err)) - listData := data.NewPrefetchedItem( + listData, err := data.NewPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), testName, details.ItemInfo{SharePoint: ListToSPInfo(listing, int64(len(byteArray)))}) + require.NoError(t, err, clues.ToCore(err)) destName := testdata.DefaultRestoreConfig("").Location diff --git a/src/internal/m365/graph/metadata_collection.go b/src/internal/m365/graph/metadata_collection.go index 7e06faaba..1c3d1f766 100644 --- a/src/internal/m365/graph/metadata_collection.go +++ b/src/internal/m365/graph/metadata_collection.go @@ -57,11 +57,16 @@ func (mce MetadataCollectionEntry) toMetadataItem() (metadataItem, error) { return metadataItem{}, clues.Wrap(err, "serializing metadata") } + item, err := data.NewUnindexedPrefetchedItem( + io.NopCloser(buf), + mce.fileName, + time.Now()) + if err != nil { + return metadataItem{}, clues.Stack(err) + } + return metadataItem{ - Item: data.NewUnindexedPrefetchedItem( - io.NopCloser(buf), - mce.fileName, - time.Now()), + Item: item, size: int64(buf.Len()), }, nil } diff --git a/src/internal/m365/graph/metadata_collection_test.go b/src/internal/m365/graph/metadata_collection_test.go index 0423cdf40..ee9ca6b5c 100644 --- a/src/internal/m365/graph/metadata_collection_test.go +++ b/src/internal/m365/graph/metadata_collection_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/tester" @@ -69,13 +70,16 @@ func (suite *MetadataCollectionUnitSuite) TestItems() { items := []metadataItem{} for i := 0; i < len(itemNames); i++ { + item, err := data.NewUnindexedPrefetchedItem( + io.NopCloser(bytes.NewReader(itemData[i])), + itemNames[i], + time.Time{}) + require.NoError(t, err, clues.ToCore(err)) + items = append( items, metadataItem{ - Item: data.NewUnindexedPrefetchedItem( - io.NopCloser(bytes.NewReader(itemData[i])), - itemNames[i], - time.Time{}), + Item: item, size: int64(len(itemData[i])), }) } @@ -103,7 +107,13 @@ func (suite *MetadataCollectionUnitSuite) TestItems() { for s := range c.Items(ctx, fault.New(true)) { gotNames = append(gotNames, s.ID()) - buf, err := io.ReadAll(s.ToReader()) + rr, err := readers.NewVersionedRestoreReader(s.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + + buf, err := io.ReadAll(rr) if !assert.NoError(t, err, clues.ToCore(err)) { continue } @@ -204,11 +214,17 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() { for item := range col.Items(ctx, fault.New(true)) { assert.Equal(t, test.metadata.fileName, item.ID()) + rr, err := readers.NewVersionedRestoreReader(item.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) + assert.False(t, rr.Format().DelInFlight) + gotMap := map[string]string{} - decoder := json.NewDecoder(item.ToReader()) + decoder := json.NewDecoder(rr) itemCount++ - err := decoder.Decode(&gotMap) + err = decoder.Decode(&gotMap) if !assert.NoError(t, err, clues.ToCore(err)) { continue } diff --git a/src/internal/m365/helper_test.go b/src/internal/m365/helper_test.go index aee2c11bb..6f3907394 100644 --- a/src/internal/m365/helper_test.go +++ b/src/internal/m365/helper_test.go @@ -16,6 +16,7 @@ import ( "golang.org/x/exp/slices" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" odStub "github.com/alcionai/corso/src/internal/m365/service/onedrive/stub" @@ -573,7 +574,12 @@ func compareExchangeEmail( expected map[string][]byte, item data.Item, ) { - itemData, err := io.ReadAll(item.ToReader()) + rr := versionedReadWrapper(t, item.ToReader()) + if rr == nil { + return + } + + itemData, err := io.ReadAll(rr) if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { return } @@ -600,7 +606,12 @@ func compareExchangeContact( expected map[string][]byte, item data.Item, ) { - itemData, err := io.ReadAll(item.ToReader()) + rr := versionedReadWrapper(t, item.ToReader()) + if rr == nil { + return + } + + itemData, err := io.ReadAll(rr) if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { return } @@ -628,7 +639,12 @@ func compareExchangeEvent( expected map[string][]byte, item data.Item, ) { - itemData, err := io.ReadAll(item.ToReader()) + rr := versionedReadWrapper(t, item.ToReader()) + if rr == nil { + return + } + + itemData, err := io.ReadAll(rr) if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { return } @@ -718,7 +734,12 @@ func compareDriveItem( return false } - buf, err := io.ReadAll(item.ToReader()) + rr := versionedReadWrapper(t, item.ToReader()) + if rr == nil { + return true + } + + buf, err := io.ReadAll(rr) if !assert.NoError(t, err, clues.ToCore(err)) { return true } @@ -850,6 +871,29 @@ func compareDriveItem( return true } +// versionedReaderWrapper strips out the version format header and checks it +// meets the current standard for all service types. If it doesn't meet the +// standard, returns nil. Else returns the versionedRestoreReader. +func versionedReadWrapper( + t *testing.T, + reader io.ReadCloser, +) io.ReadCloser { + rr, err := readers.NewVersionedRestoreReader(reader) + if !assert.NoError(t, err, clues.ToCore(err)) { + return nil + } + + if !assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) { + return nil + } + + if !assert.False(t, rr.Format().DelInFlight) { + return nil + } + + return rr +} + // compareItem compares the data returned by backup with the expected data. // Returns true if a comparison was done else false. Bool return is mostly used // to exclude OneDrive permissions for the root right now. diff --git a/src/internal/m365/service/sharepoint/api/pages_test.go b/src/internal/m365/service/sharepoint/api/pages_test.go index f462805d2..792e3eda0 100644 --- a/src/internal/m365/service/sharepoint/api/pages_test.go +++ b/src/internal/m365/service/sharepoint/api/pages_test.go @@ -109,10 +109,11 @@ func (suite *SharePointPageSuite) TestRestoreSinglePage() { //nolint:lll byteArray := spMock.Page("Byte Test") - pageData := data.NewUnindexedPrefetchedItem( + pageData, err := data.NewUnindexedPrefetchedItem( io.NopCloser(bytes.NewReader(byteArray)), testName, time.Now()) + require.NoError(t, err, clues.ToCore(err)) info, err := api.RestoreSitePage( ctx, diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index eb5673196..9246a9325 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -182,12 +182,17 @@ func collect( return nil, clues.Wrap(err, "marshalling body").WithClues(ctx) } + item, err := data.NewUnindexedPrefetchedItem( + io.NopCloser(bytes.NewReader(bs)), + col.itemName, + time.Now()) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + dc := streamCollection{ folderPath: p, - item: data.NewUnindexedPrefetchedItem( - io.NopCloser(bytes.NewReader(bs)), - col.itemName, - time.Now()), + item: item, } return &dc, nil