Inject versions in generic item (#4409)

Leverage the generic item struct to inject
serialization format information for all
items

Unwires the old code that injected versions
in kopia wrapper but leaves some code in
the wrapper to strip out the serialization
format during restore

Future PRs should move the process of
pulling out serialization format to
individual services

Viewing by commit may make review
easier

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #4328

#### Test Plan

- [x] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
ashmrtn 2023-09-29 13:16:01 -07:00 committed by GitHub
parent 8e080f83b7
commit f0ccf35b5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 606 additions and 266 deletions

View File

@ -38,8 +38,8 @@ type SerializationVersion uint16
const DefaultSerializationVersion SerializationVersion = 1 const DefaultSerializationVersion SerializationVersion = 1
const ( const (
versionFormatSize = int(unsafe.Sizeof(persistedSerializationVersion(0))) VersionFormatSize = int(unsafe.Sizeof(persistedSerializationVersion(0)))
delInFlightMask persistedSerializationVersion = 1 << ((versionFormatSize * 8) - 1) delInFlightMask persistedSerializationVersion = 1 << ((VersionFormatSize * 8) - 1)
) )
// SerializationFormat is a struct describing serialization format versions and // SerializationFormat is a struct describing serialization format versions and
@ -68,7 +68,7 @@ func NewVersionedBackupReader(
formattedVersion |= delInFlightMask formattedVersion |= delInFlightMask
} }
formattedBuf := make([]byte, versionFormatSize) formattedBuf := make([]byte, VersionFormatSize)
binary.BigEndian.PutUint32(formattedBuf, formattedVersion) binary.BigEndian.PutUint32(formattedBuf, formattedVersion)
versionReader := io.NopCloser(bytes.NewReader(formattedBuf)) versionReader := io.NopCloser(bytes.NewReader(formattedBuf))
@ -139,10 +139,10 @@ func (vbr *versionedBackupReader) Close() error {
func NewVersionedRestoreReader( func NewVersionedRestoreReader(
baseReader io.ReadCloser, baseReader io.ReadCloser,
) (*VersionedRestoreReader, error) { ) (*VersionedRestoreReader, error) {
versionBuf := make([]byte, versionFormatSize) versionBuf := make([]byte, VersionFormatSize)
// Loop to account for the unlikely case where we get a short read. // Loop to account for the unlikely case where we get a short read.
for read := 0; read < versionFormatSize; { for read := 0; read < VersionFormatSize; {
n, err := baseReader.Read(versionBuf[read:]) n, err := baseReader.Read(versionBuf[read:])
if err != nil { if err != nil {
return nil, clues.Wrap(err, "reading serialization version") return nil, clues.Wrap(err, "reading serialization version")

View File

@ -1,7 +1,6 @@
package data package data
import ( import (
"bytes"
"context" "context"
"io" "io"
"sync" "sync"
@ -10,6 +9,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/spatialcurrent/go-lazy/pkg/lazy" "github.com/spatialcurrent/go-lazy/pkg/lazy"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
@ -46,12 +46,19 @@ func NewUnindexedPrefetchedItem(
reader io.ReadCloser, reader io.ReadCloser,
itemID string, itemID string,
modTime time.Time, modTime time.Time,
) Item { ) (*unindexedPrefetchedItem, error) {
r, err := readers.NewVersionedBackupReader(
readers.SerializationFormat{Version: readers.DefaultSerializationVersion},
reader)
if err != nil {
return nil, clues.Stack(err)
}
return &unindexedPrefetchedItem{ return &unindexedPrefetchedItem{
id: itemID, id: itemID,
reader: reader, reader: r,
modTime: modTime, modTime: modTime,
} }, nil
} }
// unindexedPrefetchedItem represents a single item retrieved from the remote // unindexedPrefetchedItem represents a single item retrieved from the remote
@ -92,15 +99,16 @@ func NewPrefetchedItem(
reader io.ReadCloser, reader io.ReadCloser,
itemID string, itemID string,
info details.ItemInfo, info details.ItemInfo,
) Item { ) (*prefetchedItem, error) {
return &prefetchedItem{ inner, err := NewUnindexedPrefetchedItem(reader, itemID, info.Modified())
unindexedPrefetchedItem: unindexedPrefetchedItem{ if err != nil {
id: itemID, return nil, clues.Stack(err)
reader: reader,
modTime: info.Modified(),
},
info: info,
} }
return &prefetchedItem{
unindexedPrefetchedItem: inner,
info: info,
}, nil
} }
// prefetchedItem represents a single item retrieved from the remote service. // prefetchedItem represents a single item retrieved from the remote service.
@ -108,7 +116,7 @@ func NewPrefetchedItem(
// This item implements ItemInfo so it should be used for things that need to // This item implements ItemInfo so it should be used for things that need to
// appear in backup details. // appear in backup details.
type prefetchedItem struct { type prefetchedItem struct {
unindexedPrefetchedItem *unindexedPrefetchedItem
info details.ItemInfo info details.ItemInfo
} }
@ -129,7 +137,7 @@ func NewUnindexedLazyItem(
itemID string, itemID string,
modTime time.Time, modTime time.Time,
errs *fault.Bus, errs *fault.Bus,
) Item { ) *unindexedLazyItem {
return &unindexedLazyItem{ return &unindexedLazyItem{
ctx: ctx, ctx: ctx,
id: itemID, id: itemID,
@ -182,6 +190,10 @@ func (i *unindexedLazyItem) ToReader() io.ReadCloser {
return nil, clues.Stack(err) return nil, clues.Stack(err)
} }
format := readers.SerializationFormat{
Version: readers.DefaultSerializationVersion,
}
// If an item was deleted then return an empty file so we don't fail the // If an item was deleted then return an empty file so we don't fail the
// backup and return a sentinel error when asked for ItemInfo so we don't // backup and return a sentinel error when asked for ItemInfo so we don't
// display the item in the backup. // display the item in the backup.
@ -193,13 +205,17 @@ func (i *unindexedLazyItem) ToReader() io.ReadCloser {
logger.Ctx(i.ctx).Info("item not found") logger.Ctx(i.ctx).Info("item not found")
i.delInFlight = true i.delInFlight = true
format.DelInFlight = true
r, err := readers.NewVersionedBackupReader(format)
return io.NopCloser(bytes.NewReader([]byte{})), nil return r, clues.Stack(err).OrNil()
} }
i.info = info i.info = info
return reader, nil r, err := readers.NewVersionedBackupReader(format, reader)
return r, clues.Stack(err).OrNil()
}) })
} }
@ -217,15 +233,14 @@ func NewLazyItem(
itemID string, itemID string,
modTime time.Time, modTime time.Time,
errs *fault.Bus, errs *fault.Bus,
) Item { ) *lazyItem {
return &lazyItem{ return &lazyItem{
unindexedLazyItem: unindexedLazyItem{ unindexedLazyItem: NewUnindexedLazyItem(
ctx: ctx, ctx,
id: itemID, itemGetter,
itemGetter: itemGetter, itemID,
modTime: modTime, modTime,
errs: errs, errs),
},
} }
} }
@ -236,7 +251,7 @@ func NewLazyItem(
// This item implements ItemInfo so it should be used for things that need to // This item implements ItemInfo so it should be used for things that need to
// appear in backup details. // appear in backup details.
type lazyItem struct { type lazyItem struct {
unindexedLazyItem *unindexedLazyItem
} }
func (i *lazyItem) Info() (details.ItemInfo, error) { func (i *lazyItem) Info() (details.ItemInfo, error) {

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
@ -50,11 +51,15 @@ func TestItemUnitSuite(t *testing.T) {
} }
func (suite *ItemUnitSuite) TestUnindexedPrefetchedItem() { func (suite *ItemUnitSuite) TestUnindexedPrefetchedItem() {
prefetch := data.NewUnindexedPrefetchedItem( prefetch, err := data.NewUnindexedPrefetchedItem(
io.NopCloser(bytes.NewReader([]byte{})), io.NopCloser(bytes.NewReader([]byte{})),
"foo", "foo",
time.Time{}) time.Time{})
_, ok := prefetch.(data.ItemInfo) require.NoError(suite.T(), err, clues.ToCore(err))
var item data.Item = prefetch
_, ok := item.(data.ItemInfo)
assert.False(suite.T(), ok, "unindexedPrefetchedItem implements Info()") assert.False(suite.T(), ok, "unindexedPrefetchedItem implements Info()")
} }
@ -70,7 +75,10 @@ func (suite *ItemUnitSuite) TestUnindexedLazyItem() {
"foo", "foo",
time.Time{}, time.Time{},
fault.New(true)) fault.New(true))
_, ok := lazy.(data.ItemInfo)
var item data.Item = lazy
_, ok := item.(data.ItemInfo)
assert.False(t, ok, "unindexedLazyItem implements Info()") assert.False(t, ok, "unindexedLazyItem implements Info()")
} }
@ -140,18 +148,29 @@ func (suite *ItemUnitSuite) TestPrefetchedItem() {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() t := suite.T()
item := data.NewPrefetchedItem(test.reader, id, test.info) item, err := data.NewPrefetchedItem(test.reader, id, test.info)
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, id, item.ID(), "ID") assert.Equal(t, id, item.ID(), "ID")
assert.False(t, item.Deleted(), "deleted") assert.False(t, item.Deleted(), "deleted")
assert.Equal( assert.Equal(
t, t,
test.info.Modified(), test.info.Modified(),
item.(data.ItemModTime).ModTime(), item.ModTime(),
"mod time") "mod time")
readData, err := io.ReadAll(item.ToReader()) r, err := readers.NewVersionedRestoreReader(item.ToReader())
test.readErr(t, err, clues.ToCore(err), "read error") require.NoError(t, err, "version error: %v", clues.ToCore(err))
if err != nil {
return
}
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
test.readErr(t, err, "read error: %v", clues.ToCore(err))
assert.Equal(t, test.expectData, readData, "read data") assert.Equal(t, test.expectData, readData, "read data")
}) })
} }
@ -194,6 +213,7 @@ func (suite *ItemUnitSuite) TestLazyItem() {
table := []struct { table := []struct {
name string name string
mid *mockItemDataGetter mid *mockItemDataGetter
versionErr assert.ErrorAssertionFunc
readErr assert.ErrorAssertionFunc readErr assert.ErrorAssertionFunc
infoErr assert.ErrorAssertionFunc infoErr assert.ErrorAssertionFunc
expectData []byte expectData []byte
@ -205,6 +225,7 @@ func (suite *ItemUnitSuite) TestLazyItem() {
reader: io.NopCloser(bytes.NewReader([]byte{})), reader: io.NopCloser(bytes.NewReader([]byte{})),
info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}},
}, },
versionErr: assert.NoError,
readErr: assert.NoError, readErr: assert.NoError,
infoErr: assert.NoError, infoErr: assert.NoError,
expectData: []byte{}, expectData: []byte{},
@ -215,6 +236,7 @@ func (suite *ItemUnitSuite) TestLazyItem() {
reader: io.NopCloser(bytes.NewReader(baseData)), reader: io.NopCloser(bytes.NewReader(baseData)),
info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}}, info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}},
}, },
versionErr: assert.NoError,
readErr: assert.NoError, readErr: assert.NoError,
infoErr: assert.NoError, infoErr: assert.NoError,
expectData: baseData, expectData: baseData,
@ -225,6 +247,7 @@ func (suite *ItemUnitSuite) TestLazyItem() {
reader: io.NopCloser(bytes.NewReader(baseData)), reader: io.NopCloser(bytes.NewReader(baseData)),
info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}},
}, },
versionErr: assert.NoError,
readErr: assert.NoError, readErr: assert.NoError,
infoErr: assert.NoError, infoErr: assert.NoError,
expectData: baseData, expectData: baseData,
@ -234,6 +257,7 @@ func (suite *ItemUnitSuite) TestLazyItem() {
mid: &mockItemDataGetter{ mid: &mockItemDataGetter{
err: assert.AnError, err: assert.AnError,
}, },
versionErr: assert.Error,
readErr: assert.Error, readErr: assert.Error,
infoErr: assert.Error, infoErr: assert.Error,
expectData: []byte{}, expectData: []byte{},
@ -249,6 +273,7 @@ func (suite *ItemUnitSuite) TestLazyItem() {
}, },
info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}}, info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}},
}, },
versionErr: assert.NoError,
readErr: assert.Error, readErr: assert.Error,
infoErr: assert.NoError, infoErr: assert.NoError,
expectData: baseData[:5], expectData: baseData[:5],
@ -278,15 +303,25 @@ func (suite *ItemUnitSuite) TestLazyItem() {
assert.Equal( assert.Equal(
t, t,
now, now,
item.(data.ItemModTime).ModTime(), item.ModTime(),
"mod time") "mod time")
// Read data to execute lazy reader. // Read data to execute lazy reader.
readData, err := io.ReadAll(item.ToReader()) r, err := readers.NewVersionedRestoreReader(item.ToReader())
test.versionErr(t, err, "version error: %v", clues.ToCore(err))
if err != nil {
return
}
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
test.readErr(t, err, clues.ToCore(err), "read error") test.readErr(t, err, clues.ToCore(err), "read error")
assert.Equal(t, test.expectData, readData, "read data") assert.Equal(t, test.expectData, readData, "read data")
_, err = item.(data.ItemInfo).Info() _, err = item.Info()
test.infoErr(t, err, "Info(): %v", clues.ToCore(err)) test.infoErr(t, err, "Info(): %v", clues.ToCore(err))
e := errs.Errors() e := errs.Errors()
@ -326,15 +361,21 @@ func (suite *ItemUnitSuite) TestLazyItem_DeletedInFlight() {
assert.Equal( assert.Equal(
t, t,
now, now,
item.(data.ItemModTime).ModTime(), item.ModTime(),
"mod time") "mod time")
// Read data to execute lazy reader. // Read data to execute lazy reader.
readData, err := io.ReadAll(item.ToReader()) r, err := readers.NewVersionedRestoreReader(item.ToReader())
require.NoError(t, err, "version error: %v", clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.True(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
require.NoError(t, err, clues.ToCore(err), "read error") require.NoError(t, err, clues.ToCore(err), "read error")
assert.Empty(t, readData, "read data") assert.Empty(t, readData, "read data")
_, err = item.(data.ItemInfo).Info() _, err = item.Info()
assert.ErrorIs(t, err, data.ErrNotFound, "Info() error") assert.ErrorIs(t, err, data.ErrNotFound, "Info() error")
e := errs.Errors() e := errs.Errors()
@ -366,9 +407,9 @@ func (suite *ItemUnitSuite) TestLazyItem_InfoBeforeReadErrors() {
assert.Equal( assert.Equal(
t, t,
now, now,
item.(data.ItemModTime).ModTime(), item.ModTime(),
"mod time") "mod time")
_, err := item.(data.ItemInfo).Info() _, err := item.Info()
assert.Error(t, err, "Info() error") assert.Error(t, err, "Info() error")
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
@ -16,6 +17,7 @@ import (
var ( var (
_ data.RestoreCollection = &kopiaDataCollection{} _ data.RestoreCollection = &kopiaDataCollection{}
_ data.Item = &kopiaDataStream{} _ data.Item = &kopiaDataStream{}
_ data.ItemSize = &kopiaDataStream{}
) )
type kopiaDataCollection struct { type kopiaDataCollection struct {
@ -23,7 +25,7 @@ type kopiaDataCollection struct {
dir fs.Directory dir fs.Directory
items []string items []string
counter ByteCounter counter ByteCounter
expectedVersion uint32 expectedVersion readers.SerializationVersion
} }
func (kdc *kopiaDataCollection) Items( func (kdc *kopiaDataCollection) Items(
@ -102,7 +104,7 @@ func (kdc kopiaDataCollection) FetchItemByName(
return nil, clues.New("object is not a file").WithClues(ctx) return nil, clues.New("object is not a file").WithClues(ctx)
} }
size := f.Size() - int64(versionSize) size := f.Size() - int64(readers.VersionFormatSize)
if size < 0 { if size < 0 {
logger.Ctx(ctx).Infow("negative file size; resetting to 0", "file_size", size) logger.Ctx(ctx).Infow("negative file size; resetting to 0", "file_size", size)
@ -118,13 +120,32 @@ func (kdc kopiaDataCollection) FetchItemByName(
return nil, clues.Wrap(err, "opening file").WithClues(ctx) return nil, clues.Wrap(err, "opening file").WithClues(ctx)
} }
// TODO(ashmrtn): Remove this when individual services implement checks for
// version and deleted items.
rr, err := readers.NewVersionedRestoreReader(r)
if err != nil {
return nil, clues.Stack(err).WithClues(ctx)
}
if rr.Format().Version != kdc.expectedVersion {
return nil, clues.New("unexpected data format").
WithClues(ctx).
With(
"read_version", rr.Format().Version,
"expected_version", kdc.expectedVersion)
}
// This is a conservative check, but we shouldn't be seeing items that were
// deleted in flight during restores because there's no way to select them.
if rr.Format().DelInFlight {
return nil, clues.New("selected item marked as deleted in flight").
WithClues(ctx)
}
return &kopiaDataStream{ return &kopiaDataStream{
id: name, id: name,
reader: &restoreStreamReader{ reader: rr,
ReadCloser: r, size: size,
expectedVersion: kdc.expectedVersion,
},
size: size,
}, nil }, nil
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
dataMock "github.com/alcionai/corso/src/internal/data/mock" dataMock "github.com/alcionai/corso/src/internal/data/mock"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
@ -121,25 +122,35 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
) )
// Needs to be a function so the readers get refreshed each time. // Needs to be a function so the readers get refreshed each time.
getLayout := func() fs.Directory { getLayout := func(t *testing.T) fs.Directory {
format := readers.SerializationFormat{
Version: readers.DefaultSerializationVersion,
}
r1, err := readers.NewVersionedBackupReader(
format,
io.NopCloser(bytes.NewReader(files[0].data)))
require.NoError(t, err, clues.ToCore(err))
r2, err := readers.NewVersionedBackupReader(
format,
io.NopCloser(bytes.NewReader(files[1].data)))
require.NoError(t, err, clues.ToCore(err))
return virtualfs.NewStaticDirectory(encodeAsPath("foo"), []fs.Entry{ return virtualfs.NewStaticDirectory(encodeAsPath("foo"), []fs.Entry{
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(files[0].uuid), encodeAsPath(files[0].uuid),
nil), nil),
r: newBackupStreamReader( r: r1,
serializationVersion, size: int64(len(files[0].data) + readers.VersionFormatSize),
io.NopCloser(bytes.NewReader(files[0].data))),
size: int64(len(files[0].data) + versionSize),
}, },
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(files[1].uuid), encodeAsPath(files[1].uuid),
nil), nil),
r: newBackupStreamReader( r: r2,
serializationVersion, size: int64(len(files[1].data) + readers.VersionFormatSize),
io.NopCloser(bytes.NewReader(files[1].data))),
size: int64(len(files[1].data) + versionSize),
}, },
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
@ -224,10 +235,10 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
} }
c := kopiaDataCollection{ c := kopiaDataCollection{
dir: getLayout(), dir: getLayout(t),
path: nil, path: nil,
items: items, items: items,
expectedVersion: serializationVersion, expectedVersion: readers.DefaultSerializationVersion,
} }
var ( var (
@ -291,23 +302,34 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() {
// Needs to be a function so we can switch the serialization version as // Needs to be a function so we can switch the serialization version as
// needed. // needed.
getLayout := func(serVersion uint32) fs.Directory { getLayout := func(
t *testing.T,
serVersion readers.SerializationVersion,
) fs.Directory {
format := readers.SerializationFormat{Version: serVersion}
r1, err := readers.NewVersionedBackupReader(
format,
io.NopCloser(bytes.NewReader([]byte(noErrFileData))))
require.NoError(t, err, clues.ToCore(err))
r2, err := readers.NewVersionedBackupReader(
format,
errReader.ToReader())
require.NoError(t, err, clues.ToCore(err))
return virtualfs.NewStaticDirectory(encodeAsPath(folder2), []fs.Entry{ return virtualfs.NewStaticDirectory(encodeAsPath(folder2), []fs.Entry{
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(noErrFileName), encodeAsPath(noErrFileName),
nil), nil),
r: newBackupStreamReader( r: r1,
serVersion,
io.NopCloser(bytes.NewReader([]byte(noErrFileData)))),
}, },
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(errFileName), encodeAsPath(errFileName),
nil), nil),
r: newBackupStreamReader( r: r2,
serVersion,
errReader.ToReader()),
}, },
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
@ -330,7 +352,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() {
table := []struct { table := []struct {
name string name string
inputName string inputName string
inputSerializationVersion uint32 inputSerializationVersion readers.SerializationVersion
expectedData []byte expectedData []byte
lookupErr assert.ErrorAssertionFunc lookupErr assert.ErrorAssertionFunc
readErr assert.ErrorAssertionFunc readErr assert.ErrorAssertionFunc
@ -339,7 +361,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() {
{ {
name: "FileFound_NoError", name: "FileFound_NoError",
inputName: noErrFileName, inputName: noErrFileName,
inputSerializationVersion: serializationVersion, inputSerializationVersion: readers.DefaultSerializationVersion,
expectedData: []byte(noErrFileData), expectedData: []byte(noErrFileData),
lookupErr: assert.NoError, lookupErr: assert.NoError,
readErr: assert.NoError, readErr: assert.NoError,
@ -347,21 +369,20 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() {
{ {
name: "FileFound_ReadError", name: "FileFound_ReadError",
inputName: errFileName, inputName: errFileName,
inputSerializationVersion: serializationVersion, inputSerializationVersion: readers.DefaultSerializationVersion,
lookupErr: assert.NoError, lookupErr: assert.NoError,
readErr: assert.Error, readErr: assert.Error,
}, },
{ {
name: "FileFound_VersionError", name: "FileFound_VersionError",
inputName: noErrFileName, inputName: noErrFileName,
inputSerializationVersion: serializationVersion + 1, inputSerializationVersion: readers.DefaultSerializationVersion + 1,
lookupErr: assert.NoError, lookupErr: assert.Error,
readErr: assert.Error,
}, },
{ {
name: "FileNotFound", name: "FileNotFound",
inputName: "foo", inputName: "foo",
inputSerializationVersion: serializationVersion + 1, inputSerializationVersion: readers.DefaultSerializationVersion + 1,
lookupErr: assert.Error, lookupErr: assert.Error,
notFoundErr: true, notFoundErr: true,
}, },
@ -373,14 +394,14 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
root := getLayout(test.inputSerializationVersion) root := getLayout(t, test.inputSerializationVersion)
c := &i64counter{} c := &i64counter{}
col := &kopiaDataCollection{ col := &kopiaDataCollection{
path: pth, path: pth,
dir: root, dir: root,
counter: c, counter: c,
expectedVersion: serializationVersion, expectedVersion: readers.DefaultSerializationVersion,
} }
s, err := col.FetchItemByName(ctx, test.inputName) s, err := col.FetchItemByName(ctx, test.inputName)

View File

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/service/exchange/mock" "github.com/alcionai/corso/src/internal/m365/service/exchange/mock"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
@ -150,20 +151,27 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() {
require.NoError(suite.T(), err, clues.ToCore(err)) require.NoError(suite.T(), err, clues.ToCore(err))
// Needs to be a function so the readers get refreshed each time. // Needs to be a function so the readers get refreshed each time.
layouts := []func() fs.Directory{ layouts := []func(t *testing.T) fs.Directory{
// Has the following; // Has the following;
// - file1: data[0] // - file1: data[0]
// - errOpen: (error opening file) // - errOpen: (error opening file)
func() fs.Directory { func(t *testing.T) fs.Directory {
format := readers.SerializationFormat{
Version: readers.DefaultSerializationVersion,
}
r1, err := readers.NewVersionedBackupReader(
format,
io.NopCloser(bytes.NewReader(fileData1)))
require.NoError(t, err, clues.ToCore(err))
return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[0]), []fs.Entry{ return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[0]), []fs.Entry{
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileName1), encodeAsPath(fileName1),
nil), nil),
r: newBackupStreamReader( r: r1,
serializationVersion, size: int64(len(fileData1) + readers.VersionFormatSize),
io.NopCloser(bytes.NewReader(fileData1))),
size: int64(len(fileData1) + versionSize),
}, },
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
@ -178,34 +186,47 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() {
// - file1: data[1] // - file1: data[1]
// - file2: data[0] // - file2: data[0]
// - errOpen: data[2] // - errOpen: data[2]
func() fs.Directory { func(t *testing.T) fs.Directory {
format := readers.SerializationFormat{
Version: readers.DefaultSerializationVersion,
}
r1, err := readers.NewVersionedBackupReader(
format,
io.NopCloser(bytes.NewReader(fileData2)))
require.NoError(t, err, clues.ToCore(err))
r2, err := readers.NewVersionedBackupReader(
format,
io.NopCloser(bytes.NewReader(fileData1)))
require.NoError(t, err, clues.ToCore(err))
r3, err := readers.NewVersionedBackupReader(
format,
io.NopCloser(bytes.NewReader(fileData3)))
require.NoError(t, err, clues.ToCore(err))
return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[1]), []fs.Entry{ return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[1]), []fs.Entry{
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileName1), encodeAsPath(fileName1),
nil), nil),
r: newBackupStreamReader( r: r1,
serializationVersion, size: int64(len(fileData2) + readers.VersionFormatSize),
io.NopCloser(bytes.NewReader(fileData2))),
size: int64(len(fileData2) + versionSize),
}, },
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileName2), encodeAsPath(fileName2),
nil), nil),
r: newBackupStreamReader( r: r2,
serializationVersion, size: int64(len(fileData1) + readers.VersionFormatSize),
io.NopCloser(bytes.NewReader(fileData1))),
size: int64(len(fileData1) + versionSize),
}, },
&mockFile{ &mockFile{
StreamingFile: virtualfs.StreamingFileFromReader( StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileOpenErrName), encodeAsPath(fileOpenErrName),
nil), nil),
r: newBackupStreamReader( r: r3,
serializationVersion, size: int64(len(fileData3) + readers.VersionFormatSize),
io.NopCloser(bytes.NewReader(fileData3))),
size: int64(len(fileData3) + versionSize),
}, },
}) })
}, },
@ -257,9 +278,9 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() {
for i, layout := range layouts { for i, layout := range layouts {
col := &kopiaDataCollection{ col := &kopiaDataCollection{
path: pth, path: pth,
dir: layout(), dir: layout(t),
counter: c, counter: c,
expectedVersion: serializationVersion, expectedVersion: readers.DefaultSerializationVersion,
} }
err := dc.addCollection(colPaths[i], col) err := dc.addCollection(colPaths[i], col)

View File

@ -13,7 +13,6 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs"
@ -24,6 +23,7 @@ import (
"github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph"
@ -37,7 +37,7 @@ import (
const maxInflateTraversalDepth = 500 const maxInflateTraversalDepth = 500
var versionSize = int(unsafe.Sizeof(serializationVersion)) var versionSize = readers.VersionFormatSize
func newBackupStreamReader(version uint32, reader io.ReadCloser) *backupStreamReader { func newBackupStreamReader(version uint32, reader io.ReadCloser) *backupStreamReader {
buf := make([]byte, versionSize) buf := make([]byte, versionSize)
@ -436,7 +436,7 @@ func collectionEntries(
entry := virtualfs.StreamingFileWithModTimeFromReader( entry := virtualfs.StreamingFileWithModTimeFromReader(
encodedName, encodedName,
modTime, modTime,
newBackupStreamReader(serializationVersion, e.ToReader())) e.ToReader())
err = ctr(ctx, entry) err = ctr(ctx, entry)
if err != nil { if err != nil {

View File

@ -124,12 +124,6 @@ func expectFileData(
return return
} }
// Need to wrap with a restore stream reader to remove the version.
r = &restoreStreamReader{
ReadCloser: io.NopCloser(r),
expectedVersion: serializationVersion,
}
got, err := io.ReadAll(r) got, err := io.ReadAll(r)
if !assert.NoError(t, err, "reading data in file", name, clues.ToCore(err)) { if !assert.NoError(t, err, "reading data in file", name, clues.ToCore(err)) {
return return
@ -2420,9 +2414,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsCorrectSubt
encodeElements(inboxFileName1)[0], encodeElements(inboxFileName1)[0],
time.Time{}, time.Time{},
// Wrap with a backup reader so it gets the version injected. // Wrap with a backup reader so it gets the version injected.
newBackupStreamReader( io.NopCloser(bytes.NewReader(inboxFileData1v2))),
serializationVersion,
io.NopCloser(bytes.NewReader(inboxFileData1v2)))),
}), }),
}), }),
virtualfs.NewStaticDirectory( virtualfs.NewStaticDirectory(
@ -2582,9 +2574,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsMigrateSubt
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(inboxFileName1)[0], encodeElements(inboxFileName1)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(inboxFileData1))),
serializationVersion,
io.NopCloser(bytes.NewReader(inboxFileData1)))),
}), }),
}), }),
virtualfs.NewStaticDirectory( virtualfs.NewStaticDirectory(
@ -2596,9 +2586,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsMigrateSubt
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(contactsFileName1)[0], encodeElements(contactsFileName1)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(contactsFileData1))),
serializationVersion,
io.NopCloser(bytes.NewReader(contactsFileData1)))),
}), }),
}), }),
}) })
@ -2817,15 +2805,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(fileName5)[0], encodeElements(fileName5)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(fileData5))),
serializationVersion,
io.NopCloser(bytes.NewReader(fileData5)))),
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(fileName6)[0], encodeElements(fileName6)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(fileData6))),
serializationVersion,
io.NopCloser(bytes.NewReader(fileData6)))),
}) })
counters[folderID3] = count counters[folderID3] = count
@ -2835,15 +2819,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(fileName3)[0], encodeElements(fileName3)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(fileData3))),
serializationVersion,
io.NopCloser(bytes.NewReader(fileData3)))),
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(fileName4)[0], encodeElements(fileName4)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(fileData4))),
serializationVersion,
io.NopCloser(bytes.NewReader(fileData4)))),
folder, folder,
}) })
counters[folderID2] = count counters[folderID2] = count
@ -2859,15 +2839,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(fileName1)[0], encodeElements(fileName1)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(fileData1))),
serializationVersion,
io.NopCloser(bytes.NewReader(fileData1)))),
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(fileName2)[0], encodeElements(fileName2)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(fileData2))),
serializationVersion,
io.NopCloser(bytes.NewReader(fileData2)))),
folder, folder,
folder4, folder4,
}) })
@ -2879,15 +2855,11 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(fileName7)[0], encodeElements(fileName7)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(fileData7))),
serializationVersion,
io.NopCloser(bytes.NewReader(fileData7)))),
virtualfs.StreamingFileWithModTimeFromReader( virtualfs.StreamingFileWithModTimeFromReader(
encodeElements(fileName8)[0], encodeElements(fileName8)[0],
time.Time{}, time.Time{},
newBackupStreamReader( io.NopCloser(bytes.NewReader(fileData8))),
serializationVersion,
io.NopCloser(bytes.NewReader(fileData8)))),
}) })
counters[folderID5] = count counters[folderID5] = count

View File

@ -18,6 +18,7 @@ import (
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/observe"
@ -36,8 +37,6 @@ const (
// possibly corresponding to who is making the backup. // possibly corresponding to who is making the backup.
corsoHost = "corso-host" corsoHost = "corso-host"
corsoUser = "corso" corsoUser = "corso"
serializationVersion uint32 = 1
) )
// common manifest tags // common manifest tags
@ -447,7 +446,7 @@ func loadDirsAndItems(
dir: dir, dir: dir,
items: dirItems.items, items: dirItems.items,
counter: bcounter, counter: bcounter,
expectedVersion: serializationVersion, expectedVersion: readers.DefaultSerializationVersion,
} }
if err := mergeCol.addCollection(dirItems.dir.String(), dc); err != nil { if err := mergeCol.addCollection(dirItems.dir.String(), dc); err != nil {

View File

@ -1268,7 +1268,10 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
ctx, ctx,
[]identity.Reasoner{r}, []identity.Reasoner{r},
nil, nil,
[]data.BackupCollection{dc1, dc2}, []data.BackupCollection{
dataMock.NewVersionedBackupCollection(t, dc1),
dataMock.NewVersionedBackupCollection(t, dc2),
},
nil, nil,
nil, nil,
true, true,
@ -1577,12 +1580,15 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
}) })
} }
collections = append(collections, collection) collections = append(
collections,
dataMock.NewVersionedBackupCollection(t, collection))
} }
r := NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory) r := NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory)
stats, deets, _, err := suite.w.ConsumeBackupCollections( // Other tests check basic things about deets so not doing that again here.
stats, _, _, err := suite.w.ConsumeBackupCollections(
suite.ctx, suite.ctx,
[]identity.Reasoner{r}, []identity.Reasoner{r},
nil, nil,
@ -1597,8 +1603,6 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
require.Equal(t, stats.TotalDirectoryCount, expectedDirs) require.Equal(t, stats.TotalDirectoryCount, expectedDirs)
require.Equal(t, stats.IgnoredErrorCount, 0) require.Equal(t, stats.IgnoredErrorCount, 0)
require.False(t, stats.Incomplete) require.False(t, stats.Incomplete)
// 6 file and 2 folder entries.
assert.Len(t, deets.Details().Entries, expectedFiles+2)
suite.snapshotID = manifest.ID(stats.SnapshotID) suite.snapshotID = manifest.ID(stats.SnapshotID)
} }
@ -1629,7 +1633,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
excludePrefix bool excludePrefix bool
expectedCachedItems int expectedCachedItems int
expectedUncachedItems int expectedUncachedItems int
cols func() []data.BackupCollection cols func(t *testing.T) []data.BackupCollection
backupIDCheck require.ValueAssertionFunc backupIDCheck require.ValueAssertionFunc
restoreCheck assert.ErrorAssertionFunc restoreCheck assert.ErrorAssertionFunc
}{ }{
@ -1638,7 +1642,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
excludeItem: true, excludeItem: true,
expectedCachedItems: len(suite.filesByPath) - 1, expectedCachedItems: len(suite.filesByPath) - 1,
expectedUncachedItems: 0, expectedUncachedItems: 0,
cols: func() []data.BackupCollection { cols: func(t *testing.T) []data.BackupCollection {
return nil return nil
}, },
backupIDCheck: require.NotEmpty, backupIDCheck: require.NotEmpty,
@ -1650,7 +1654,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
excludePrefix: true, excludePrefix: true,
expectedCachedItems: len(suite.filesByPath) - 1, expectedCachedItems: len(suite.filesByPath) - 1,
expectedUncachedItems: 0, expectedUncachedItems: 0,
cols: func() []data.BackupCollection { cols: func(t *testing.T) []data.BackupCollection {
return nil return nil
}, },
backupIDCheck: require.NotEmpty, backupIDCheck: require.NotEmpty,
@ -1661,7 +1665,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
// No snapshot should be made since there were no changes. // No snapshot should be made since there were no changes.
expectedCachedItems: 0, expectedCachedItems: 0,
expectedUncachedItems: 0, expectedUncachedItems: 0,
cols: func() []data.BackupCollection { cols: func(t *testing.T) []data.BackupCollection {
return nil return nil
}, },
// Backup doesn't run. // Backup doesn't run.
@ -1671,7 +1675,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
name: "NoExcludeItemWithChanges", name: "NoExcludeItemWithChanges",
expectedCachedItems: len(suite.filesByPath), expectedCachedItems: len(suite.filesByPath),
expectedUncachedItems: 1, expectedUncachedItems: 1,
cols: func() []data.BackupCollection { cols: func(t *testing.T) []data.BackupCollection {
c := exchMock.NewCollection( c := exchMock.NewCollection(
suite.testPath1, suite.testPath1,
suite.testPath1, suite.testPath1,
@ -1679,7 +1683,9 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
c.ColState = data.NotMovedState c.ColState = data.NotMovedState
c.PrevPath = suite.testPath1 c.PrevPath = suite.testPath1
return []data.BackupCollection{c} return []data.BackupCollection{
dataMock.NewVersionedBackupCollection(t, c),
}
}, },
backupIDCheck: require.NotEmpty, backupIDCheck: require.NotEmpty,
restoreCheck: assert.NoError, restoreCheck: assert.NoError,
@ -1717,7 +1723,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
Manifest: man, Manifest: man,
Reasons: []identity.Reasoner{r}, Reasons: []identity.Reasoner{r},
}), }),
test.cols(), test.cols(t),
excluded, excluded,
nil, nil,
true, true,

View File

@ -584,15 +584,24 @@ func (oc *Collection) streamDriveItem(
return progReader, nil return progReader, nil
}) })
// We wrap the reader with a lazy reader so that the progress bar is only storeItem, err := data.NewUnindexedPrefetchedItem(
// initialized if the file is read. Since we're not actually lazily reading
// data just use the eager item implementation.
oc.data <- data.NewUnindexedPrefetchedItem(
metaReader, metaReader,
metaFileName+metaSuffix, metaFileName+metaSuffix,
// Metadata file should always use the latest time as // Metadata file should always use the latest time as
// permissions change does not update mod time. // permissions change does not update mod time.
time.Now()) time.Now())
if err != nil {
errs.AddRecoverable(ctx, clues.Stack(err).
WithClues(ctx).
Label(fault.LabelForceNoBackupCreation))
return
}
// We wrap the reader with a lazy reader so that the progress bar is only
// initialized if the file is read. Since we're not actually lazily reading
// data just use the eager item implementation.
oc.data <- storeItem
// Item read successfully, add to collection // Item read successfully, add to collection
if isFile { if isFile {

View File

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata"
metaTD "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata/testdata" metaTD "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata/testdata"
@ -256,7 +257,7 @@ func (suite *CollectionUnitSuite) TestCollection() {
mt := readItem.(data.ItemModTime) mt := readItem.(data.ItemModTime)
assert.Equal(t, now, mt.ModTime()) assert.Equal(t, now, mt.ModTime())
readData, err := io.ReadAll(readItem.ToReader()) rr, err := readers.NewVersionedRestoreReader(readItem.ToReader())
test.expectErr(t, err) test.expectErr(t, err)
if err != nil { if err != nil {
@ -267,13 +268,25 @@ func (suite *CollectionUnitSuite) TestCollection() {
return return
} }
assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version)
assert.False(t, rr.Format().DelInFlight)
readData, err := io.ReadAll(rr)
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, stubItemContent, readData) assert.Equal(t, stubItemContent, readData)
readItemMeta := readItems[1] readItemMeta := readItems[1]
assert.Equal(t, stubItemID+metadata.MetaFileSuffix, readItemMeta.ID()) assert.Equal(t, stubItemID+metadata.MetaFileSuffix, readItemMeta.ID())
rr, err = readers.NewVersionedRestoreReader(readItemMeta.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version)
assert.False(t, rr.Format().DelInFlight)
readMeta := metadata.Metadata{} readMeta := metadata.Metadata{}
err = json.NewDecoder(readItemMeta.ToReader()).Decode(&readMeta) err = json.NewDecoder(rr).Decode(&readMeta)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
metaTD.AssertMetadataEqual(t, stubMeta, readMeta) metaTD.AssertMetadataEqual(t, stubMeta, readMeta)
@ -485,12 +498,18 @@ func (suite *CollectionUnitSuite) TestCollectionPermissionBackupLatestModTime()
for _, i := range readItems { for _, i := range readItems {
if strings.HasSuffix(i.ID(), metadata.MetaFileSuffix) { if strings.HasSuffix(i.ID(), metadata.MetaFileSuffix) {
content, err := io.ReadAll(i.ToReader()) rr, err := readers.NewVersionedRestoreReader(i.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version)
assert.False(t, rr.Format().DelInFlight)
content, err := io.ReadAll(rr)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.Equal(t, `{"filename":"Fake Item","permissionMode":1}`, string(content)) require.Equal(t, `{"filename":"Fake Item","permissionMode":1}`, string(content))
im, ok := i.(data.ItemModTime) im, ok := i.(data.ItemModTime)
require.Equal(t, ok, true, "modtime interface") require.True(t, ok, "modtime interface")
require.Greater(t, im.ModTime(), mtime, "permissions time greater than mod time") require.Greater(t, im.ModTime(), mtime, "permissions time greater than mod time")
} }
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/prefixmatcher"
pmMock "github.com/alcionai/corso/src/internal/common/prefixmatcher/mock" pmMock "github.com/alcionai/corso/src/internal/common/prefixmatcher/mock"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
dataMock "github.com/alcionai/corso/src/internal/data/mock"
"github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata"
"github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph"
odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts" odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts"
@ -1114,7 +1115,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestDeserializeMetadata() {
func(*support.ControllerOperationStatus) {}) func(*support.ControllerOperationStatus) {})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
cols = append(cols, data.NoFetchRestoreCollection{Collection: mc}) cols = append(cols, dataMock.NewUnversionedRestoreCollection(
t,
data.NoFetchRestoreCollection{Collection: mc}))
} }
deltas, paths, canUsePreviousBackup, err := deserializeMetadata(ctx, cols) deltas, paths, canUsePreviousBackup, err := deserializeMetadata(ctx, cols)
@ -2211,7 +2214,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
func(*support.ControllerOperationStatus) {}) func(*support.ControllerOperationStatus) {})
assert.NoError(t, err, "creating metadata collection", clues.ToCore(err)) assert.NoError(t, err, "creating metadata collection", clues.ToCore(err))
prevMetadata := []data.RestoreCollection{data.NoFetchRestoreCollection{Collection: mc}} prevMetadata := []data.RestoreCollection{
dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: mc}),
}
errs := fault.New(true) errs := fault.New(true)
delList := prefixmatcher.NewStringSetBuilder() delList := prefixmatcher.NewStringSetBuilder()
@ -2238,7 +2243,9 @@ func (suite *OneDriveCollectionsUnitSuite) TestGet() {
deltas, paths, _, err := deserializeMetadata( deltas, paths, _, err := deserializeMetadata(
ctx, ctx,
[]data.RestoreCollection{ []data.RestoreCollection{
data.NoFetchRestoreCollection{Collection: baseCol}, dataMock.NewUnversionedRestoreCollection(
t,
data.NoFetchRestoreCollection{Collection: baseCol}),
}) })
if !assert.NoError(t, err, "deserializing metadata", clues.ToCore(err)) { if !assert.NoError(t, err, "deserializing metadata", clues.ToCore(err)) {
continue continue

View File

@ -15,7 +15,9 @@ import (
inMock "github.com/alcionai/corso/src/internal/common/idname/mock" inMock "github.com/alcionai/corso/src/internal/common/idname/mock"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
dataMock "github.com/alcionai/corso/src/internal/data/mock"
"github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/operations/inject"
@ -322,7 +324,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{
data.NoFetchRestoreCollection{Collection: coll}, dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: coll}),
}) })
test.expectError(t, err, clues.ToCore(err)) test.expectError(t, err, clues.ToCore(err))
@ -591,7 +593,7 @@ func (suite *BackupIntgSuite) TestDelta() {
require.NotNil(t, metadata, "collections contains a metadata collection") require.NotNil(t, metadata, "collections contains a metadata collection")
cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{
data.NoFetchRestoreCollection{Collection: metadata}, dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: metadata}),
}) })
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup") assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -666,7 +668,12 @@ func (suite *BackupIntgSuite) TestMailSerializationRegression() {
for stream := range streamChannel { for stream := range streamChannel {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader()) rr, err := readers.NewVersionedRestoreReader(stream.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version)
read, err := buf.ReadFrom(rr)
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
assert.NotZero(t, read) assert.NotZero(t, read)
@ -744,7 +751,13 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() {
for stream := range edc.Items(ctx, fault.New(true)) { for stream := range edc.Items(ctx, fault.New(true)) {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
rr, err := readers.NewVersionedRestoreReader(stream.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version)
read, err := buf.ReadFrom(rr)
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
assert.NotZero(t, read) assert.NotZero(t, read)
@ -878,7 +891,12 @@ func (suite *BackupIntgSuite) TestEventsSerializationRegression() {
for item := range edc.Items(ctx, fault.New(true)) { for item := range edc.Items(ctx, fault.New(true)) {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
read, err := buf.ReadFrom(item.ToReader()) rr, err := readers.NewVersionedRestoreReader(item.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version)
read, err := buf.ReadFrom(rr)
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
assert.NotZero(t, read) assert.NotZero(t, read)
@ -1198,7 +1216,9 @@ func checkMetadata(
) { ) {
catPaths, _, err := ParseMetadataCollections( catPaths, _, err := ParseMetadataCollections(
ctx, ctx,
[]data.RestoreCollection{data.NoFetchRestoreCollection{Collection: c}}) []data.RestoreCollection{
dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: c}),
})
if !assert.NoError(t, err, "getting metadata", clues.ToCore(err)) { if !assert.NoError(t, err, "getting metadata", clues.ToCore(err)) {
return return
} }

View File

@ -278,10 +278,21 @@ func (col *prefetchCollection) streamItems(
return return
} }
stream <- data.NewPrefetchedItem( item, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(itemData)), io.NopCloser(bytes.NewReader(itemData)),
id, id,
details.ItemInfo{Exchange: info}) details.ItemInfo{Exchange: info})
if err != nil {
el.AddRecoverable(
ctx,
clues.Stack(err).
WithClues(ctx).
Label(fault.LabelForceNoBackupCreation))
return
}
stream <- item
atomic.AddInt64(&success, 1) atomic.AddInt64(&success, 1)
atomic.AddInt64(&totalBytes, info.Size) atomic.AddInt64(&totalBytes, info.Size)

View File

@ -17,6 +17,7 @@ import (
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/exchange/mock" "github.com/alcionai/corso/src/internal/m365/collection/exchange/mock"
"github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph"
@ -55,13 +56,20 @@ func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() t := suite.T()
ed := data.NewPrefetchedItem( ed, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(test.readData)), io.NopCloser(bytes.NewReader(test.readData)),
"itemID", "itemID",
details.ItemInfo{}) details.ItemInfo{})
require.NoError(t, err, clues.ToCore(err))
r, err := readers.NewVersionedRestoreReader(ed.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
_, err := buf.ReadFrom(ed.ToReader()) _, err = buf.ReadFrom(r)
assert.NoError(t, err, "reading data: %v", clues.ToCore(err)) assert.NoError(t, err, "reading data: %v", clues.ToCore(err))
assert.Equal(t, test.readData, buf.Bytes(), "read data") assert.Equal(t, test.readData, buf.Bytes(), "read data")
assert.Equal(t, "itemID", ed.ID(), "item ID") assert.Equal(t, "itemID", ed.ID(), "item ID")
@ -493,11 +501,11 @@ func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() {
time.Now(), time.Now(),
fault.New(true)) fault.New(true))
_, err := li.(data.ItemInfo).Info() _, err := li.Info()
assert.Error(suite.T(), err, "Info without reading data should error") assert.Error(suite.T(), err, "Info without reading data should error")
} }
func (suite *CollectionUnitSuite) TestLazyItem() { func (suite *CollectionUnitSuite) TestLazyItem_GetDataErrors() {
var ( var (
parentPath = "inbox/private/silly cats" parentPath = "inbox/private/silly cats"
now = time.Now() now = time.Now()
@ -505,44 +513,19 @@ func (suite *CollectionUnitSuite) TestLazyItem() {
table := []struct { table := []struct {
name string name string
modTime time.Time
getErr error getErr error
serializeErr error serializeErr error
expectModTime time.Time
expectReadErrType error expectReadErrType error
dataCheck assert.ValueAssertionFunc
expectInfoErr bool
expectInfoErrType error
}{ }{
{
name: "ReturnsEmptyReaderOnDeletedInFlight",
modTime: now,
getErr: graph.ErrDeletedInFlight,
dataCheck: assert.Empty,
expectInfoErr: true,
expectInfoErrType: data.ErrNotFound,
},
{
name: "ReturnsValidReaderAndInfo",
modTime: now,
dataCheck: assert.NotEmpty,
expectModTime: now,
},
{ {
name: "ReturnsErrorOnGenericGetError", name: "ReturnsErrorOnGenericGetError",
modTime: now,
getErr: assert.AnError, getErr: assert.AnError,
expectReadErrType: assert.AnError, expectReadErrType: assert.AnError,
dataCheck: assert.Empty,
expectInfoErr: true,
}, },
{ {
name: "ReturnsErrorOnGenericSerializeError", name: "ReturnsErrorOnGenericSerializeError",
modTime: now,
serializeErr: assert.AnError, serializeErr: assert.AnError,
expectReadErrType: assert.AnError, expectReadErrType: assert.AnError,
dataCheck: assert.Empty,
expectInfoErr: true,
}, },
} }
@ -575,47 +558,128 @@ func (suite *CollectionUnitSuite) TestLazyItem() {
userID: "userID", userID: "userID",
itemID: "itemID", itemID: "itemID",
getter: getter, getter: getter,
modTime: test.modTime, modTime: now,
immutableIDs: false, immutableIDs: false,
parentPath: parentPath, parentPath: parentPath,
}, },
"itemID", "itemID",
test.modTime, now,
fault.New(true)) fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted") assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal( assert.Equal(t, now, li.ModTime(), "item mod time")
t,
test.modTime,
li.(data.ItemModTime).ModTime(),
"item mod time")
readData, err := io.ReadAll(li.ToReader()) _, err := readers.NewVersionedRestoreReader(li.ToReader())
if test.expectReadErrType == nil { assert.ErrorIs(t, err, test.expectReadErrType)
assert.NoError(t, err, "reading item data: %v", clues.ToCore(err))
} else {
assert.ErrorIs(t, err, test.expectReadErrType, "read error")
}
test.dataCheck(t, readData, "read item data")
info, err := li.(data.ItemInfo).Info()
// Didn't expect an error getting info, it should be valid.
if !test.expectInfoErr {
assert.NoError(t, err, "getting item info: %v", clues.ToCore(err))
assert.Equal(t, parentPath, info.Exchange.ParentPath)
assert.Equal(t, test.expectModTime, info.Modified())
return
}
// Should get some form of error when trying to get info. // Should get some form of error when trying to get info.
_, err = li.Info()
assert.Error(t, err, "Info()") assert.Error(t, err, "Info()")
if test.expectInfoErrType != nil {
assert.ErrorIs(t, err, test.expectInfoErrType, "Info() error")
}
}) })
} }
} }
func (suite *CollectionUnitSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlight() {
var (
t = suite.T()
parentPath = "inbox/private/silly cats"
now = time.Now()
)
ctx, flush := tester.NewContext(t)
defer flush()
getter := &mock.ItemGetSerialize{GetErr: graph.ErrDeletedInFlight}
li := data.NewLazyItem(
ctx,
&lazyItemGetter{
userID: "userID",
itemID: "itemID",
getter: getter,
modTime: now,
immutableIDs: false,
parentPath: parentPath,
},
"itemID",
now,
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(
t,
now,
li.ModTime(),
"item mod time")
r, err := readers.NewVersionedRestoreReader(li.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.True(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
assert.NoError(t, err, "reading item data: %v", clues.ToCore(err))
assert.Empty(t, readData, "read item data")
_, err = li.Info()
assert.ErrorIs(t, err, data.ErrNotFound, "Info() error")
}
func (suite *CollectionUnitSuite) TestLazyItem() {
var (
t = suite.T()
parentPath = "inbox/private/silly cats"
now = time.Now()
)
ctx, flush := tester.NewContext(t)
defer flush()
// Exact data type doesn't really matter.
testData := models.NewMessage()
testData.SetSubject(ptr.To("hello world"))
getter := &mock.ItemGetSerialize{GetData: testData}
li := data.NewLazyItem(
ctx,
&lazyItemGetter{
userID: "userID",
itemID: "itemID",
getter: getter,
modTime: now,
immutableIDs: false,
parentPath: parentPath,
},
"itemID",
now,
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(
t,
now,
li.ModTime(),
"item mod time")
r, err := readers.NewVersionedRestoreReader(li.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
assert.NoError(t, err, "reading item data: %v", clues.ToCore(err))
assert.NotEmpty(t, readData, "read item data")
info, err := li.Info()
assert.NoError(t, err, "getting item info: %v", clues.ToCore(err))
assert.Equal(t, parentPath, info.Exchange.ParentPath)
assert.Equal(t, now, info.Modified())
}

View File

@ -150,27 +150,47 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
parentFolderID, parentFolderID,
id) id)
if err != nil { if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "writing channel message to serializer")) el.AddRecoverable(
ctx,
clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation))
return return
} }
if err := writer.WriteObjectValue("", item); err != nil { if err := writer.WriteObjectValue("", item); err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "writing channel message to serializer")) el.AddRecoverable(
ctx,
clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation))
return return
} }
itemData, err := writer.GetSerializedContent() itemData, err := writer.GetSerializedContent()
if err != nil { if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "serializing channel message")) el.AddRecoverable(
ctx,
clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation))
return return
} }
info.ParentPath = col.LocationPath().String() info.ParentPath = col.LocationPath().String()
col.stream <- data.NewPrefetchedItem( storeItem, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(itemData)), io.NopCloser(bytes.NewReader(itemData)),
id, id,
details.ItemInfo{Groups: info}) details.ItemInfo{Groups: info})
if err != nil {
el.AddRecoverable(
ctx,
clues.Stack(err).
WithClues(ctx).
Label(fault.LabelForceNoBackupCreation))
return
}
col.stream <- storeItem
atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, info.Size) atomic.AddInt64(&totalBytes, info.Size)

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/groups/mock" "github.com/alcionai/corso/src/internal/m365/collection/groups/mock"
"github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/m365/support"
@ -48,13 +49,20 @@ func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() t := suite.T()
ed := data.NewPrefetchedItem( ed, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(test.readData)), io.NopCloser(bytes.NewReader(test.readData)),
"itemID", "itemID",
details.ItemInfo{}) details.ItemInfo{})
require.NoError(t, err, clues.ToCore(err))
r, err := readers.NewVersionedRestoreReader(ed.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
_, err := buf.ReadFrom(ed.ToReader()) _, err = buf.ReadFrom(r)
assert.NoError(t, err, "reading data: %v", clues.ToCore(err)) assert.NoError(t, err, "reading data: %v", clues.ToCore(err))
assert.Equal(t, test.readData, buf.Bytes(), "read data") assert.Equal(t, test.readData, buf.Bytes(), "read data")
assert.Equal(t, "itemID", ed.ID(), "item ID") assert.Equal(t, "itemID", ed.ID(), "item ID")

View File

@ -211,11 +211,17 @@ func (sc *Collection) retrieveLists(
metrics.Bytes += size metrics.Bytes += size
metrics.Successes++ metrics.Successes++
sc.data <- data.NewPrefetchedItem(
item, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(byteArray)), io.NopCloser(bytes.NewReader(byteArray)),
ptr.Val(lst.GetId()), ptr.Val(lst.GetId()),
details.ItemInfo{SharePoint: ListToSPInfo(lst, size)}) details.ItemInfo{SharePoint: ListToSPInfo(lst, size)})
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation))
continue
}
sc.data <- item
progress <- struct{}{} progress <- struct{}{}
} }
} }
@ -272,11 +278,17 @@ func (sc *Collection) retrievePages(
if size > 0 { if size > 0 {
metrics.Bytes += size metrics.Bytes += size
metrics.Successes++ metrics.Successes++
sc.data <- data.NewPrefetchedItem(
item, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(byteArray)), io.NopCloser(bytes.NewReader(byteArray)),
ptr.Val(pg.GetId()), ptr.Val(pg.GetId()),
details.ItemInfo{SharePoint: pageToSPInfo(pg, root, size)}) details.ItemInfo{SharePoint: pageToSPInfo(pg, root, size)})
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation))
continue
}
sc.data <- item
progress <- struct{}{} progress <- struct{}{}
} }
} }

View File

@ -103,10 +103,11 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
byteArray, err := ow.GetSerializedContent() byteArray, err := ow.GetSerializedContent()
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
data := data.NewPrefetchedItem( data, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(byteArray)), io.NopCloser(bytes.NewReader(byteArray)),
name, name,
details.ItemInfo{SharePoint: ListToSPInfo(listing, int64(len(byteArray)))}) details.ItemInfo{SharePoint: ListToSPInfo(listing, int64(len(byteArray)))})
require.NoError(t, err, clues.ToCore(err))
return data return data
}, },
@ -132,10 +133,11 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
page, err := betaAPI.CreatePageFromBytes(byteArray) page, err := betaAPI.CreatePageFromBytes(byteArray)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
data := data.NewPrefetchedItem( data, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(byteArray)), io.NopCloser(bytes.NewReader(byteArray)),
itemName, itemName,
details.ItemInfo{SharePoint: betaAPI.PageInfo(page, int64(len(byteArray)))}) details.ItemInfo{SharePoint: betaAPI.PageInfo(page, int64(len(byteArray)))})
require.NoError(t, err, clues.ToCore(err))
return data return data
}, },
@ -194,10 +196,11 @@ func (suite *SharePointCollectionSuite) TestListCollection_Restore() {
byteArray, err := service.Serialize(listing) byteArray, err := service.Serialize(listing)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
listData := data.NewPrefetchedItem( listData, err := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(byteArray)), io.NopCloser(bytes.NewReader(byteArray)),
testName, testName,
details.ItemInfo{SharePoint: ListToSPInfo(listing, int64(len(byteArray)))}) details.ItemInfo{SharePoint: ListToSPInfo(listing, int64(len(byteArray)))})
require.NoError(t, err, clues.ToCore(err))
destName := testdata.DefaultRestoreConfig("").Location destName := testdata.DefaultRestoreConfig("").Location

View File

@ -57,11 +57,16 @@ func (mce MetadataCollectionEntry) toMetadataItem() (metadataItem, error) {
return metadataItem{}, clues.Wrap(err, "serializing metadata") return metadataItem{}, clues.Wrap(err, "serializing metadata")
} }
item, err := data.NewUnindexedPrefetchedItem(
io.NopCloser(buf),
mce.fileName,
time.Now())
if err != nil {
return metadataItem{}, clues.Stack(err)
}
return metadataItem{ return metadataItem{
Item: data.NewUnindexedPrefetchedItem( Item: item,
io.NopCloser(buf),
mce.fileName,
time.Now()),
size: int64(buf.Len()), size: int64(buf.Len()),
}, nil }, nil
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
@ -69,13 +70,16 @@ func (suite *MetadataCollectionUnitSuite) TestItems() {
items := []metadataItem{} items := []metadataItem{}
for i := 0; i < len(itemNames); i++ { for i := 0; i < len(itemNames); i++ {
item, err := data.NewUnindexedPrefetchedItem(
io.NopCloser(bytes.NewReader(itemData[i])),
itemNames[i],
time.Time{})
require.NoError(t, err, clues.ToCore(err))
items = append( items = append(
items, items,
metadataItem{ metadataItem{
Item: data.NewUnindexedPrefetchedItem( Item: item,
io.NopCloser(bytes.NewReader(itemData[i])),
itemNames[i],
time.Time{}),
size: int64(len(itemData[i])), size: int64(len(itemData[i])),
}) })
} }
@ -103,7 +107,13 @@ func (suite *MetadataCollectionUnitSuite) TestItems() {
for s := range c.Items(ctx, fault.New(true)) { for s := range c.Items(ctx, fault.New(true)) {
gotNames = append(gotNames, s.ID()) gotNames = append(gotNames, s.ID())
buf, err := io.ReadAll(s.ToReader()) rr, err := readers.NewVersionedRestoreReader(s.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version)
assert.False(t, rr.Format().DelInFlight)
buf, err := io.ReadAll(rr)
if !assert.NoError(t, err, clues.ToCore(err)) { if !assert.NoError(t, err, clues.ToCore(err)) {
continue continue
} }
@ -204,11 +214,17 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() {
for item := range col.Items(ctx, fault.New(true)) { for item := range col.Items(ctx, fault.New(true)) {
assert.Equal(t, test.metadata.fileName, item.ID()) assert.Equal(t, test.metadata.fileName, item.ID())
rr, err := readers.NewVersionedRestoreReader(item.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version)
assert.False(t, rr.Format().DelInFlight)
gotMap := map[string]string{} gotMap := map[string]string{}
decoder := json.NewDecoder(item.ToReader()) decoder := json.NewDecoder(rr)
itemCount++ itemCount++
err := decoder.Decode(&gotMap) err = decoder.Decode(&gotMap)
if !assert.NoError(t, err, clues.ToCore(err)) { if !assert.NoError(t, err, clues.ToCore(err)) {
continue continue
} }

View File

@ -16,6 +16,7 @@ import (
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata"
odStub "github.com/alcionai/corso/src/internal/m365/service/onedrive/stub" odStub "github.com/alcionai/corso/src/internal/m365/service/onedrive/stub"
@ -573,7 +574,12 @@ func compareExchangeEmail(
expected map[string][]byte, expected map[string][]byte,
item data.Item, item data.Item,
) { ) {
itemData, err := io.ReadAll(item.ToReader()) rr := versionedReadWrapper(t, item.ToReader())
if rr == nil {
return
}
itemData, err := io.ReadAll(rr)
if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) {
return return
} }
@ -600,7 +606,12 @@ func compareExchangeContact(
expected map[string][]byte, expected map[string][]byte,
item data.Item, item data.Item,
) { ) {
itemData, err := io.ReadAll(item.ToReader()) rr := versionedReadWrapper(t, item.ToReader())
if rr == nil {
return
}
itemData, err := io.ReadAll(rr)
if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) {
return return
} }
@ -628,7 +639,12 @@ func compareExchangeEvent(
expected map[string][]byte, expected map[string][]byte,
item data.Item, item data.Item,
) { ) {
itemData, err := io.ReadAll(item.ToReader()) rr := versionedReadWrapper(t, item.ToReader())
if rr == nil {
return
}
itemData, err := io.ReadAll(rr)
if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) { if !assert.NoError(t, err, "reading collection item", item.ID(), clues.ToCore(err)) {
return return
} }
@ -718,7 +734,12 @@ func compareDriveItem(
return false return false
} }
buf, err := io.ReadAll(item.ToReader()) rr := versionedReadWrapper(t, item.ToReader())
if rr == nil {
return true
}
buf, err := io.ReadAll(rr)
if !assert.NoError(t, err, clues.ToCore(err)) { if !assert.NoError(t, err, clues.ToCore(err)) {
return true return true
} }
@ -850,6 +871,29 @@ func compareDriveItem(
return true return true
} }
// versionedReaderWrapper strips out the version format header and checks it
// meets the current standard for all service types. If it doesn't meet the
// standard, returns nil. Else returns the versionedRestoreReader.
func versionedReadWrapper(
t *testing.T,
reader io.ReadCloser,
) io.ReadCloser {
rr, err := readers.NewVersionedRestoreReader(reader)
if !assert.NoError(t, err, clues.ToCore(err)) {
return nil
}
if !assert.Equal(t, readers.DefaultSerializationVersion, rr.Format().Version) {
return nil
}
if !assert.False(t, rr.Format().DelInFlight) {
return nil
}
return rr
}
// compareItem compares the data returned by backup with the expected data. // compareItem compares the data returned by backup with the expected data.
// Returns true if a comparison was done else false. Bool return is mostly used // Returns true if a comparison was done else false. Bool return is mostly used
// to exclude OneDrive permissions for the root right now. // to exclude OneDrive permissions for the root right now.

View File

@ -109,10 +109,11 @@ func (suite *SharePointPageSuite) TestRestoreSinglePage() {
//nolint:lll //nolint:lll
byteArray := spMock.Page("Byte Test") byteArray := spMock.Page("Byte Test")
pageData := data.NewUnindexedPrefetchedItem( pageData, err := data.NewUnindexedPrefetchedItem(
io.NopCloser(bytes.NewReader(byteArray)), io.NopCloser(bytes.NewReader(byteArray)),
testName, testName,
time.Now()) time.Now())
require.NoError(t, err, clues.ToCore(err))
info, err := api.RestoreSitePage( info, err := api.RestoreSitePage(
ctx, ctx,

View File

@ -182,12 +182,17 @@ func collect(
return nil, clues.Wrap(err, "marshalling body").WithClues(ctx) return nil, clues.Wrap(err, "marshalling body").WithClues(ctx)
} }
item, err := data.NewUnindexedPrefetchedItem(
io.NopCloser(bytes.NewReader(bs)),
col.itemName,
time.Now())
if err != nil {
return nil, clues.Stack(err).WithClues(ctx)
}
dc := streamCollection{ dc := streamCollection{
folderPath: p, folderPath: p,
item: data.NewUnindexedPrefetchedItem( item: item,
io.NopCloser(bytes.NewReader(bs)),
col.itemName,
time.Now()),
} }
return &dc, nil return &dc, nil