Make generic version of item structs (#4350)
Generic structs implement items that have details associated with them. Items that don't have details will need a separate struct since we use a downcast to determine if we can get details. --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No #### Type of change - [ ] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [x] 🧹 Tech Debt/Cleanup #### Issue(s) * #4191 #### Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
c81a769c76
commit
a3dabaa589
170
src/internal/data/item.go
Normal file
170
src/internal/data/item.go
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
package data
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alcionai/clues"
|
||||||
|
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewDeletedItem(itemID string) Item {
|
||||||
|
return &prefetchedItem{
|
||||||
|
id: itemID,
|
||||||
|
deleted: true,
|
||||||
|
// TODO(ashmrtn): This really doesn't need to be set since deleted items are
|
||||||
|
// never passed to the actual storage engine. Setting it for now so tests
|
||||||
|
// don't break.
|
||||||
|
modTime: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPrefetchedItem(
|
||||||
|
reader io.ReadCloser,
|
||||||
|
itemID string,
|
||||||
|
info details.ItemInfo,
|
||||||
|
) Item {
|
||||||
|
return &prefetchedItem{
|
||||||
|
id: itemID,
|
||||||
|
reader: reader,
|
||||||
|
info: info,
|
||||||
|
modTime: info.Modified(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// prefetchedItem represents a single item retrieved from the remote service.
|
||||||
|
type prefetchedItem struct {
|
||||||
|
id string
|
||||||
|
reader io.ReadCloser
|
||||||
|
info details.ItemInfo
|
||||||
|
// modTime is the modified time of the item. It should match the modTime in
|
||||||
|
// info if info is present. Here as a separate field so that deleted items
|
||||||
|
// don't error out by trying to source it from info.
|
||||||
|
modTime time.Time
|
||||||
|
|
||||||
|
// deleted flags if this item has been removed in the remote service and
|
||||||
|
// should be removed in storage as well.
|
||||||
|
deleted bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i prefetchedItem) ID() string {
|
||||||
|
return i.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *prefetchedItem) ToReader() io.ReadCloser {
|
||||||
|
return i.reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i prefetchedItem) Deleted() bool {
|
||||||
|
return i.deleted
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i prefetchedItem) Info() (details.ItemInfo, error) {
|
||||||
|
return i.info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i prefetchedItem) ModTime() time.Time {
|
||||||
|
return i.modTime
|
||||||
|
}
|
||||||
|
|
||||||
|
type ItemDataGetter interface {
|
||||||
|
GetData(context.Context) (io.ReadCloser, *details.ItemInfo, bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLazyItem(
|
||||||
|
ctx context.Context,
|
||||||
|
itemGetter ItemDataGetter,
|
||||||
|
itemID string,
|
||||||
|
modTime time.Time,
|
||||||
|
errs *fault.Bus,
|
||||||
|
) Item {
|
||||||
|
return &lazyItem{
|
||||||
|
ctx: ctx,
|
||||||
|
id: itemID,
|
||||||
|
itemGetter: itemGetter,
|
||||||
|
modTime: modTime,
|
||||||
|
errs: errs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lazyItem represents a single item retrieved from the remote service. It
|
||||||
|
// lazily fetches the item's data when the first call to ToReader().Read() is
|
||||||
|
// made.
|
||||||
|
type lazyItem struct {
|
||||||
|
ctx context.Context
|
||||||
|
id string
|
||||||
|
errs *fault.Bus
|
||||||
|
itemGetter ItemDataGetter
|
||||||
|
|
||||||
|
modTime time.Time
|
||||||
|
// info holds the details information for this item. Store a pointer in this
|
||||||
|
// struct so we can tell if it's been set already or not.
|
||||||
|
//
|
||||||
|
// This also helps with garbage collection because now the golang garbage
|
||||||
|
// collector can collect the lazyItem struct once the storage engine is done
|
||||||
|
// with it. The ItemInfo struct needs to stick around until the end of the
|
||||||
|
// backup though as backup details is written last.
|
||||||
|
info *details.ItemInfo
|
||||||
|
|
||||||
|
delInFlight bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i lazyItem) ID() string {
|
||||||
|
return i.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *lazyItem) ToReader() io.ReadCloser {
|
||||||
|
return lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||||
|
reader, info, delInFlight, err := i.itemGetter.GetData(i.ctx)
|
||||||
|
if err != nil {
|
||||||
|
err = clues.Stack(err)
|
||||||
|
i.errs.AddRecoverable(i.ctx, err)
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If an item was deleted then return an empty file so we don't fail the
|
||||||
|
// backup and return a sentinel error when asked for ItemInfo so we don't
|
||||||
|
// display the item in the backup.
|
||||||
|
//
|
||||||
|
// The item will be deleted from storage on the next backup when either the
|
||||||
|
// delta token shows it's removed or we do a full backup (token expired
|
||||||
|
// etc.) and the item isn't enumerated in that set.
|
||||||
|
if delInFlight {
|
||||||
|
logger.Ctx(i.ctx).Info("item not found")
|
||||||
|
|
||||||
|
i.delInFlight = true
|
||||||
|
|
||||||
|
return io.NopCloser(bytes.NewReader([]byte{})), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
i.info = info
|
||||||
|
|
||||||
|
return reader, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i lazyItem) Deleted() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i lazyItem) Info() (details.ItemInfo, error) {
|
||||||
|
if i.delInFlight {
|
||||||
|
return details.ItemInfo{}, clues.Stack(ErrNotFound).WithClues(i.ctx)
|
||||||
|
} else if i.info == nil {
|
||||||
|
return details.ItemInfo{}, clues.New("requesting ItemInfo before data retrieval").
|
||||||
|
WithClues(i.ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
return *i.info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i lazyItem) ModTime() time.Time {
|
||||||
|
return i.modTime
|
||||||
|
}
|
||||||
343
src/internal/data/item_test.go
Normal file
343
src/internal/data/item_test.go
Normal file
@ -0,0 +1,343 @@
|
|||||||
|
package data_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alcionai/clues"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
|
)
|
||||||
|
|
||||||
|
type errReader struct {
|
||||||
|
io.ReadCloser
|
||||||
|
readCount int
|
||||||
|
errAfter int
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (er *errReader) Read(p []byte) (int, error) {
|
||||||
|
if er.err != nil && er.readCount == er.errAfter {
|
||||||
|
return 0, er.err
|
||||||
|
}
|
||||||
|
|
||||||
|
toRead := len(p)
|
||||||
|
if er.readCount+toRead > er.errAfter {
|
||||||
|
toRead = er.errAfter - er.readCount
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := er.ReadCloser.Read(p[:toRead])
|
||||||
|
er.readCount += n
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
type ItemUnitSuite struct {
|
||||||
|
tester.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestItemUnitSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &ItemUnitSuite{Suite: tester.NewUnitSuite(t)})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ItemUnitSuite) TestDeletedItem() {
|
||||||
|
var (
|
||||||
|
t = suite.T()
|
||||||
|
|
||||||
|
id = "foo"
|
||||||
|
item = data.NewDeletedItem(id)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert.Equal(t, id, item.ID(), "ID")
|
||||||
|
assert.True(t, item.Deleted(), "deleted")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ItemUnitSuite) TestPrefetchedItem() {
|
||||||
|
var (
|
||||||
|
id = "foo"
|
||||||
|
now = time.Now()
|
||||||
|
|
||||||
|
baseData = []byte("hello world")
|
||||||
|
)
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
reader io.ReadCloser
|
||||||
|
info details.ItemInfo
|
||||||
|
|
||||||
|
readErr require.ErrorAssertionFunc
|
||||||
|
expectData []byte
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "EmptyReader",
|
||||||
|
reader: io.NopCloser(bytes.NewReader([]byte{})),
|
||||||
|
info: details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}},
|
||||||
|
readErr: require.NoError,
|
||||||
|
expectData: []byte{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ReaderWithData",
|
||||||
|
reader: io.NopCloser(bytes.NewReader(baseData)),
|
||||||
|
info: details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}},
|
||||||
|
readErr: require.NoError,
|
||||||
|
expectData: baseData,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ReaderWithData DifferentService",
|
||||||
|
reader: io.NopCloser(bytes.NewReader(baseData)),
|
||||||
|
info: details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}},
|
||||||
|
readErr: require.NoError,
|
||||||
|
expectData: baseData,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ReaderWithData ReadError",
|
||||||
|
reader: &errReader{
|
||||||
|
ReadCloser: io.NopCloser(bytes.NewReader(baseData)),
|
||||||
|
errAfter: 5,
|
||||||
|
err: assert.AnError,
|
||||||
|
},
|
||||||
|
info: details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}},
|
||||||
|
readErr: require.Error,
|
||||||
|
expectData: baseData[:5],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range table {
|
||||||
|
suite.Run(test.name, func() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
item := data.NewPrefetchedItem(test.reader, id, test.info)
|
||||||
|
|
||||||
|
assert.Equal(t, id, item.ID(), "ID")
|
||||||
|
assert.False(t, item.Deleted(), "deleted")
|
||||||
|
assert.Equal(
|
||||||
|
t,
|
||||||
|
test.info.Modified(),
|
||||||
|
item.(data.ItemModTime).ModTime(),
|
||||||
|
"mod time")
|
||||||
|
|
||||||
|
readData, err := io.ReadAll(item.ToReader())
|
||||||
|
test.readErr(t, err, clues.ToCore(err), "read error")
|
||||||
|
assert.Equal(t, test.expectData, readData, "read data")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockItemDataGetter struct {
|
||||||
|
getCalled bool
|
||||||
|
|
||||||
|
reader io.ReadCloser
|
||||||
|
info *details.ItemInfo
|
||||||
|
delInFlight bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mid *mockItemDataGetter) check(t *testing.T, expectCalled bool) {
|
||||||
|
assert.Equal(t, expectCalled, mid.getCalled, "GetData() called")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mid *mockItemDataGetter) GetData(
|
||||||
|
context.Context,
|
||||||
|
) (io.ReadCloser, *details.ItemInfo, bool, error) {
|
||||||
|
mid.getCalled = true
|
||||||
|
return mid.reader, mid.info, mid.delInFlight, mid.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ItemUnitSuite) TestLazyItem() {
|
||||||
|
var (
|
||||||
|
id = "foo"
|
||||||
|
now = time.Now()
|
||||||
|
|
||||||
|
baseData = []byte("hello world")
|
||||||
|
)
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
mid *mockItemDataGetter
|
||||||
|
readErr assert.ErrorAssertionFunc
|
||||||
|
infoErr assert.ErrorAssertionFunc
|
||||||
|
expectData []byte
|
||||||
|
expectBusErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "EmptyReader",
|
||||||
|
mid: &mockItemDataGetter{
|
||||||
|
reader: io.NopCloser(bytes.NewReader([]byte{})),
|
||||||
|
info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}},
|
||||||
|
},
|
||||||
|
readErr: assert.NoError,
|
||||||
|
infoErr: assert.NoError,
|
||||||
|
expectData: []byte{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ReaderWithData",
|
||||||
|
mid: &mockItemDataGetter{
|
||||||
|
reader: io.NopCloser(bytes.NewReader(baseData)),
|
||||||
|
info: &details.ItemInfo{Exchange: &details.ExchangeInfo{Modified: now}},
|
||||||
|
},
|
||||||
|
readErr: assert.NoError,
|
||||||
|
infoErr: assert.NoError,
|
||||||
|
expectData: baseData,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ReaderWithData",
|
||||||
|
mid: &mockItemDataGetter{
|
||||||
|
reader: io.NopCloser(bytes.NewReader(baseData)),
|
||||||
|
info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}},
|
||||||
|
},
|
||||||
|
readErr: assert.NoError,
|
||||||
|
infoErr: assert.NoError,
|
||||||
|
expectData: baseData,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ReaderWithData GetDataError",
|
||||||
|
mid: &mockItemDataGetter{
|
||||||
|
err: assert.AnError,
|
||||||
|
},
|
||||||
|
readErr: assert.Error,
|
||||||
|
infoErr: assert.Error,
|
||||||
|
expectData: []byte{},
|
||||||
|
expectBusErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ReaderWithData ReadError",
|
||||||
|
mid: &mockItemDataGetter{
|
||||||
|
reader: &errReader{
|
||||||
|
ReadCloser: io.NopCloser(bytes.NewReader(baseData)),
|
||||||
|
errAfter: 5,
|
||||||
|
err: assert.AnError,
|
||||||
|
},
|
||||||
|
info: &details.ItemInfo{OneDrive: &details.OneDriveInfo{Modified: now}},
|
||||||
|
},
|
||||||
|
readErr: assert.Error,
|
||||||
|
infoErr: assert.NoError,
|
||||||
|
expectData: baseData[:5],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range table {
|
||||||
|
suite.Run(test.name, func() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
errs := fault.New(true)
|
||||||
|
|
||||||
|
defer test.mid.check(t, true)
|
||||||
|
|
||||||
|
item := data.NewLazyItem(
|
||||||
|
ctx,
|
||||||
|
test.mid,
|
||||||
|
id,
|
||||||
|
now,
|
||||||
|
errs)
|
||||||
|
|
||||||
|
assert.Equal(t, id, item.ID(), "ID")
|
||||||
|
assert.False(t, item.Deleted(), "deleted")
|
||||||
|
assert.Equal(
|
||||||
|
t,
|
||||||
|
now,
|
||||||
|
item.(data.ItemModTime).ModTime(),
|
||||||
|
"mod time")
|
||||||
|
|
||||||
|
// Read data to execute lazy reader.
|
||||||
|
readData, err := io.ReadAll(item.ToReader())
|
||||||
|
test.readErr(t, err, clues.ToCore(err), "read error")
|
||||||
|
assert.Equal(t, test.expectData, readData, "read data")
|
||||||
|
|
||||||
|
_, err = item.(data.ItemInfo).Info()
|
||||||
|
test.infoErr(t, err, "Info(): %v", clues.ToCore(err))
|
||||||
|
|
||||||
|
e := errs.Errors()
|
||||||
|
|
||||||
|
if !test.expectBusErr {
|
||||||
|
assert.Nil(t, e.Failure, "hard failure")
|
||||||
|
assert.Empty(t, e.Recovered, "recovered")
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.NotNil(t, e.Failure, "hard failure")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ItemUnitSuite) TestLazyItem_DeletedInFlight() {
|
||||||
|
var (
|
||||||
|
id = "foo"
|
||||||
|
now = time.Now()
|
||||||
|
)
|
||||||
|
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
errs := fault.New(true)
|
||||||
|
|
||||||
|
mid := &mockItemDataGetter{delInFlight: true}
|
||||||
|
defer mid.check(t, true)
|
||||||
|
|
||||||
|
item := data.NewLazyItem(ctx, mid, id, now, errs)
|
||||||
|
|
||||||
|
assert.Equal(t, id, item.ID(), "ID")
|
||||||
|
assert.False(t, item.Deleted(), "deleted")
|
||||||
|
assert.Equal(
|
||||||
|
t,
|
||||||
|
now,
|
||||||
|
item.(data.ItemModTime).ModTime(),
|
||||||
|
"mod time")
|
||||||
|
|
||||||
|
// Read data to execute lazy reader.
|
||||||
|
readData, err := io.ReadAll(item.ToReader())
|
||||||
|
require.NoError(t, err, clues.ToCore(err), "read error")
|
||||||
|
assert.Empty(t, readData, "read data")
|
||||||
|
|
||||||
|
_, err = item.(data.ItemInfo).Info()
|
||||||
|
assert.ErrorIs(t, err, data.ErrNotFound, "Info() error")
|
||||||
|
|
||||||
|
e := errs.Errors()
|
||||||
|
|
||||||
|
assert.Nil(t, e.Failure, "hard failure")
|
||||||
|
assert.Empty(t, e.Recovered, "recovered")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ItemUnitSuite) TestLazyItem_InfoBeforeReadErrors() {
|
||||||
|
var (
|
||||||
|
id = "foo"
|
||||||
|
now = time.Now()
|
||||||
|
)
|
||||||
|
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
errs := fault.New(true)
|
||||||
|
|
||||||
|
mid := &mockItemDataGetter{}
|
||||||
|
defer mid.check(t, false)
|
||||||
|
|
||||||
|
item := data.NewLazyItem(ctx, mid, id, now, errs)
|
||||||
|
|
||||||
|
assert.Equal(t, id, item.ID(), "ID")
|
||||||
|
assert.False(t, item.Deleted(), "deleted")
|
||||||
|
assert.Equal(
|
||||||
|
t,
|
||||||
|
now,
|
||||||
|
item.(data.ItemModTime).ModTime(),
|
||||||
|
"mod time")
|
||||||
|
|
||||||
|
_, err := item.(data.ItemInfo).Info()
|
||||||
|
assert.Error(t, err, "Info() error")
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user