Exch generic item (#4351)

Use generic item structs in Exchange code

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #4191

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
ashmrtn 2023-09-25 16:16:12 -07:00 committed by GitHub
parent 0df876d106
commit cf3744bcfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 193 deletions

View File

@ -21,7 +21,7 @@ func NewDeletedItem(itemID string) Item {
// TODO(ashmrtn): This really doesn't need to be set since deleted items are
// never passed to the actual storage engine. Setting it for now so tests
// don't break.
modTime: time.Now(),
modTime: time.Now().UTC(),
}
}

View File

@ -12,7 +12,6 @@ import (
"time"
"github.com/alcionai/clues"
"github.com/spatialcurrent/go-lazy/pkg/lazy"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/data"
@ -27,9 +26,7 @@ import (
var (
_ data.BackupCollection = &prefetchCollection{}
_ data.Item = &Item{}
_ data.ItemInfo = &Item{}
_ data.ItemModTime = &Item{}
_ data.BackupCollection = &lazyFetchCollection{}
)
const (
@ -230,11 +227,7 @@ func (col *prefetchCollection) streamItems(
defer wg.Done()
defer func() { <-semaphoreCh }()
stream <- &Item{
id: id,
modTime: time.Now().UTC(), // removed items have no modTime entry.
deleted: true,
}
stream <- data.NewDeletedItem(id)
atomic.AddInt64(&success, 1)
@ -285,12 +278,10 @@ func (col *prefetchCollection) streamItems(
return
}
stream <- &Item{
id: id,
message: itemData,
info: info,
modTime: info.Modified,
}
stream <- data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(itemData)),
id,
details.ItemInfo{Exchange: info})
atomic.AddInt64(&success, 1)
atomic.AddInt64(&totalBytes, info.Size)
@ -377,11 +368,7 @@ func (col *lazyFetchCollection) streamItems(
// delete all removed items
for id := range col.removed {
stream <- &Item{
id: id,
modTime: time.Now().UTC(), // removed items have no modTime entry.
deleted: true,
}
stream <- data.NewDeletedItem(id)
atomic.AddInt64(&success, 1)
@ -405,16 +392,19 @@ func (col *lazyFetchCollection) streamItems(
"service", path.ExchangeService.String(),
"category", col.Category().String())
stream <- &lazyItem{
ctx: ictx,
userID: user,
id: id,
getter: col.getter,
modTime: modTime,
immutableIDs: col.Opts().ToggleFeatures.ExchangeImmutableIDs,
parentPath: parentPath,
errs: errs,
}
stream <- data.NewLazyItem(
ictx,
&lazyItemGetter{
userID: user,
itemID: id,
getter: col.getter,
modTime: modTime,
immutableIDs: col.Opts().ToggleFeatures.ExchangeImmutableIDs,
parentPath: parentPath,
},
id,
modTime,
errs)
atomic.AddInt64(&success, 1)
@ -424,137 +414,48 @@ func (col *lazyFetchCollection) streamItems(
}
}
// Item represents a single item retrieved from exchange
type Item struct {
id string
// TODO: We may need this to be a "oneOf" of `message`, `contact`, etc.
// going forward. Using []byte for now but I assume we'll have
// some structured type in here (serialization to []byte can be done in `Read`)
message []byte
info *details.ExchangeInfo // temporary change to bring populate function into directory
// TODO(ashmrtn): Can probably eventually be sourced from info as there's a
// request to provide modtime in ItemInfo structs.
modTime time.Time
// true if the item was marked by graph as deleted.
deleted bool
}
func (i *Item) ID() string {
return i.id
}
func (i *Item) ToReader() io.ReadCloser {
return io.NopCloser(bytes.NewReader(i.message))
}
func (i Item) Deleted() bool {
return i.deleted
}
func (i *Item) Info() (details.ItemInfo, error) {
return details.ItemInfo{Exchange: i.info}, nil
}
func (i *Item) ModTime() time.Time {
return i.modTime
}
func NewItem(
identifier string,
dataBytes []byte,
detail details.ExchangeInfo,
modTime time.Time,
) Item {
return Item{
id: identifier,
message: dataBytes,
info: &detail,
modTime: modTime,
}
}
// lazyItem represents a single item retrieved from exchange that lazily fetches
// the item's data when the first call to ToReader().Read() is made.
type lazyItem struct {
ctx context.Context
userID string
id string
parentPath string
getter itemGetterSerializer
errs *fault.Bus
modTime time.Time
// info holds the Exchnage-specific details information for this item. Store
// a pointer in this struct so the golang garbage collector can collect the
// Item struct once kopia is done with it. The ExchangeInfo struct needs to
// stick around until the end of the backup though as backup details is
// written last.
info *details.ExchangeInfo
type lazyItemGetter struct {
getter itemGetterSerializer
userID string
itemID string
parentPath string
modTime time.Time
immutableIDs bool
delInFlight bool
}
func (i lazyItem) ID() string {
return i.id
}
func (i *lazyItem) ToReader() io.ReadCloser {
return lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
itemData, info, err := getItemAndInfo(
i.ctx,
i.getter,
i.userID,
i.ID(),
i.immutableIDs,
i.parentPath)
if err != nil {
// If an item was deleted then return an empty file so we don't fail
// the backup and return a sentinel error when asked for ItemInfo so
// we don't display the item in the backup.
//
// The item will be deleted from kopia on the next backup when the
// delta token shows it's removed.
if graph.IsErrDeletedInFlight(err) {
logger.CtxErr(i.ctx, err).Info("item not found")
i.delInFlight = true
return io.NopCloser(bytes.NewReader([]byte{})), nil
}
err = clues.Stack(err)
i.errs.AddRecoverable(i.ctx, err)
return nil, err
func (lig *lazyItemGetter) GetData(
ctx context.Context,
) (io.ReadCloser, *details.ItemInfo, bool, error) {
itemData, info, err := getItemAndInfo(
ctx,
lig.getter,
lig.userID,
lig.itemID,
lig.immutableIDs,
lig.parentPath)
if err != nil {
// If an item was deleted then return an empty file so we don't fail
// the backup and return a sentinel error when asked for ItemInfo so
// we don't display the item in the backup.
//
// The item will be deleted from kopia on the next backup when the
// delta token shows it's removed.
if graph.IsErrDeletedInFlight(err) {
logger.CtxErr(ctx, err).Info("item not found")
return nil, nil, true, nil
}
i.info = info
// Update the mod time to what we already told kopia about. This is required
// for proper details merging.
i.info.Modified = i.modTime
err = clues.Stack(err)
return io.NopCloser(bytes.NewReader(itemData)), nil
})
}
func (i lazyItem) Deleted() bool {
return false
}
func (i lazyItem) Info() (details.ItemInfo, error) {
if i.delInFlight {
return details.ItemInfo{}, clues.Stack(data.ErrNotFound).WithClues(i.ctx)
} else if i.info == nil {
return details.ItemInfo{}, clues.New("requesting ItemInfo before data retrieval").
WithClues(i.ctx)
return nil, nil, false, err
}
return details.ItemInfo{Exchange: i.info}, nil
}
// Update the mod time to what we already told kopia about. This is required
// for proper details merging.
info.Modified = lig.modTime
func (i lazyItem) ModTime() time.Time {
return i.modTime
return io.NopCloser(bytes.NewReader(itemData)),
&details.ItemInfo{Exchange: info},
false,
nil
}

View File

@ -36,31 +36,37 @@ func TestCollectionUnitSuite(t *testing.T) {
suite.Run(t, &CollectionUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *CollectionUnitSuite) TestReader_Valid() {
m := []byte("test message")
description := "aFile"
ed := &Item{id: description, message: m}
func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() {
table := []struct {
name string
readData []byte
}{
{
name: "HasData",
readData: []byte("test message"),
},
{
name: "Empty",
readData: []byte{},
},
}
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(ed.ToReader())
assert.NoError(suite.T(), err, clues.ToCore(err))
assert.Equal(suite.T(), buf.Bytes(), m)
assert.Equal(suite.T(), description, ed.ID())
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
func (suite *CollectionUnitSuite) TestReader_Empty() {
var (
empty []byte
expected int64
t = suite.T()
)
ed := data.NewPrefetchedItem(
io.NopCloser(bytes.NewReader(test.readData)),
"itemID",
details.ItemInfo{})
ed := &Item{message: empty}
buf := &bytes.Buffer{}
received, err := buf.ReadFrom(ed.ToReader())
assert.Equal(t, expected, received)
assert.NoError(t, err, clues.ToCore(err))
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(ed.ToReader())
assert.NoError(t, err, "reading data: %v", clues.ToCore(err))
assert.Equal(t, test.readData, buf.Bytes(), "read data")
assert.Equal(t, "itemID", ed.ID(), "item ID")
})
}
}
func (suite *CollectionUnitSuite) TestNewCollection_state() {
@ -480,9 +486,14 @@ func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() {
ctx, flush := tester.NewContext(t)
defer flush()
li := lazyItem{ctx: ctx}
li := data.NewLazyItem(
ctx,
nil,
"itemID",
time.Now(),
fault.New(true))
_, err := li.Info()
_, err := li.(data.ItemInfo).Info()
assert.Error(suite.T(), err, "Info without reading data should error")
}
@ -558,30 +569,37 @@ func (suite *CollectionUnitSuite) TestLazyItem() {
SerializeErr: test.serializeErr,
}
li := &lazyItem{
ctx: ctx,
userID: "userID",
id: "itemID",
parentPath: parentPath,
getter: getter,
errs: fault.New(true),
modTime: test.modTime,
immutableIDs: false,
}
li := data.NewLazyItem(
ctx,
&lazyItemGetter{
userID: "userID",
itemID: "itemID",
getter: getter,
modTime: test.modTime,
immutableIDs: false,
parentPath: parentPath,
},
"itemID",
test.modTime,
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(t, test.modTime, li.ModTime(), "item mod time")
assert.Equal(
t,
test.modTime,
li.(data.ItemModTime).ModTime(),
"item mod time")
data, err := io.ReadAll(li.ToReader())
readData, err := io.ReadAll(li.ToReader())
if test.expectReadErrType == nil {
assert.NoError(t, err, "reading item data: %v", clues.ToCore(err))
} else {
assert.ErrorIs(t, err, test.expectReadErrType, "read error")
}
test.dataCheck(t, data, "read item data")
test.dataCheck(t, readData, "read item data")
info, err := li.Info()
info, err := li.(data.ItemInfo).Info()
// Didn't expect an error getting info, it should be valid.
if !test.expectInfoErr {