Add tests for prefetch streamItems
This commit is contained in:
parent
152d78de21
commit
b0a8b310d3
@ -180,9 +180,8 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
|
|||||||
defer func() { <-semaphoreCh }()
|
defer func() { <-semaphoreCh }()
|
||||||
|
|
||||||
// This is a no-op for conversations, as there is no way to detect
|
// This is a no-op for conversations, as there is no way to detect
|
||||||
// deleted items in a conversation. It might be added in the future
|
// deleted items. It might be added in future if graph supports it,
|
||||||
// if graph supports it, so make sure we put up both .data and .meta
|
// so make sure we put up both .data and .meta files for deletions.
|
||||||
// files for deletions.
|
|
||||||
if col.getAndAugment.supportsItemMetadata() {
|
if col.getAndAugment.supportsItemMetadata() {
|
||||||
col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix)
|
col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix)
|
||||||
col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix)
|
col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix)
|
||||||
|
|||||||
@ -185,7 +185,8 @@ func (getAndAugmentChannelMessage) supportsItemMetadata() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
|
// Test prefetch streamItems() for channel messages.
|
||||||
|
func (suite *CollectionUnitSuite) TestPrefetchCollection_ChannelMessages() {
|
||||||
var (
|
var (
|
||||||
t = suite.T()
|
t = suite.T()
|
||||||
start = time.Now().Add(-1 * time.Second)
|
start = time.Now().Add(-1 * time.Second)
|
||||||
@ -264,6 +265,11 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
|
|||||||
go col.streamItems(ctx, errs)
|
go col.streamItems(ctx, errs)
|
||||||
|
|
||||||
for item := range col.stream {
|
for item := range col.stream {
|
||||||
|
// Make sure item IDs don't have .data and .meta suffixes.
|
||||||
|
// Those are only meant for conversations.
|
||||||
|
assert.False(t, strings.HasSuffix(item.ID(), ".data"))
|
||||||
|
assert.False(t, strings.HasSuffix(item.ID(), ".meta"))
|
||||||
|
|
||||||
itemCount++
|
itemCount++
|
||||||
|
|
||||||
_, aok := test.added[item.ID()]
|
_, aok := test.added[item.ID()]
|
||||||
@ -292,6 +298,161 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test prefetch streamItems() for conversations.
|
||||||
|
func (suite *CollectionUnitSuite) TestPrefetchCollection_Conversations() {
|
||||||
|
var (
|
||||||
|
t = suite.T()
|
||||||
|
start = time.Now().Add(-1 * time.Second)
|
||||||
|
statusUpdater = func(*support.ControllerOperationStatus) {}
|
||||||
|
)
|
||||||
|
|
||||||
|
fullPath, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "f", "s")
|
||||||
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
locPath, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "f", "s")
|
||||||
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
added map[string]time.Time
|
||||||
|
removed map[string]struct{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no items",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "only added items",
|
||||||
|
added: map[string]time.Time{
|
||||||
|
"fisher": {},
|
||||||
|
"flannigan": {},
|
||||||
|
"fitzbog": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "only removed items",
|
||||||
|
removed: map[string]struct{}{
|
||||||
|
"princess": {},
|
||||||
|
"poppy": {},
|
||||||
|
"petunia": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "added and removed items",
|
||||||
|
added: map[string]time.Time{
|
||||||
|
"goblin": {},
|
||||||
|
},
|
||||||
|
removed: map[string]struct{}{
|
||||||
|
"general": {},
|
||||||
|
"goose": {},
|
||||||
|
"grumbles": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
suite.Run(test.name, func() {
|
||||||
|
var (
|
||||||
|
t = suite.T()
|
||||||
|
errs = fault.New(true)
|
||||||
|
itemCount int
|
||||||
|
itemMap = map[string]data.Item{}
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
col := &prefetchCollection[models.Conversationable, models.Postable]{
|
||||||
|
BaseCollection: data.NewBaseCollection(
|
||||||
|
fullPath,
|
||||||
|
nil,
|
||||||
|
locPath.ToBuilder(),
|
||||||
|
control.DefaultOptions(),
|
||||||
|
false,
|
||||||
|
count.New()),
|
||||||
|
added: test.added,
|
||||||
|
contains: container[models.Conversationable]{},
|
||||||
|
removed: test.removed,
|
||||||
|
getAndAugment: &getAndAugmentConversation{},
|
||||||
|
stream: make(chan data.Item),
|
||||||
|
statusUpdater: statusUpdater,
|
||||||
|
}
|
||||||
|
|
||||||
|
go col.streamItems(ctx, errs)
|
||||||
|
|
||||||
|
for item := range col.stream {
|
||||||
|
var trimmedID string
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case strings.HasSuffix(item.ID(), ".data"):
|
||||||
|
trimmedID = strings.TrimSuffix(item.ID(), ".data")
|
||||||
|
case strings.HasSuffix(item.ID(), ".meta"):
|
||||||
|
trimmedID = strings.TrimSuffix(item.ID(), ".meta")
|
||||||
|
default:
|
||||||
|
assert.Fail(t, "unexpected item suffix: %s", item.ID())
|
||||||
|
}
|
||||||
|
|
||||||
|
itemCount++
|
||||||
|
itemMap[item.ID()] = item
|
||||||
|
|
||||||
|
_, aok := test.added[trimmedID]
|
||||||
|
if aok {
|
||||||
|
assert.False(t, item.Deleted(), "additions should not be marked as deleted")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, rok := test.removed[trimmedID]
|
||||||
|
if rok {
|
||||||
|
assert.True(t, item.Deleted(), "removals should be marked as deleted")
|
||||||
|
dimt, ok := item.(data.ItemModTime)
|
||||||
|
require.True(t, ok, "item implements data.ItemModTime")
|
||||||
|
assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()")
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID())
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.NoError(t, errs.Failure())
|
||||||
|
assert.Equal(
|
||||||
|
t,
|
||||||
|
2*(len(test.added)+len(test.removed)),
|
||||||
|
itemCount,
|
||||||
|
"should see all expected items")
|
||||||
|
|
||||||
|
addedAndRemoved := append(maps.Keys(test.added), maps.Keys(test.removed)...)
|
||||||
|
for _, id := range addedAndRemoved {
|
||||||
|
// Should have a .data and a .meta file
|
||||||
|
d, ok := itemMap[id+".data"]
|
||||||
|
assert.True(t, ok, "should have data file for %q", id)
|
||||||
|
|
||||||
|
m, ok := itemMap[id+".meta"]
|
||||||
|
assert.True(t, ok, "should have meta file for %q", id)
|
||||||
|
|
||||||
|
// Meta files should not have item info.
|
||||||
|
assert.Implements(t, (*data.Item)(nil), m)
|
||||||
|
|
||||||
|
if slices.Contains(maps.Keys(test.removed), id) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mod times should match. Not doing this check for removed items
|
||||||
|
// since mod time is set to now() for them.
|
||||||
|
assert.Equal(
|
||||||
|
t,
|
||||||
|
d.(data.ItemModTime).ModTime(),
|
||||||
|
m.(data.ItemModTime).ModTime(),
|
||||||
|
"item mod time")
|
||||||
|
|
||||||
|
// Read meta file data. The data is of no significance, we just want
|
||||||
|
// to make sure the file is readable.
|
||||||
|
r := m.ToReader()
|
||||||
|
|
||||||
|
_, err := io.ReadAll(r)
|
||||||
|
assert.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
r.Close()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type getAndAugmentConversation struct {
|
type getAndAugmentConversation struct {
|
||||||
GetItemErr error
|
GetItemErr error
|
||||||
GetMetaErr error
|
GetMetaErr error
|
||||||
@ -341,6 +502,7 @@ func (m *getAndAugmentConversation) check(t *testing.T, expected []string) {
|
|||||||
assert.Equal(t, expected, m.CallIDs, "expected calls")
|
assert.Equal(t, expected, m.CallIDs, "expected calls")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test lazy streamItems() for conversations.
|
||||||
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Conversations() {
|
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Conversations() {
|
||||||
var (
|
var (
|
||||||
t = suite.T()
|
t = suite.T()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user