diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index e3020c2aa..b3f7d7191 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -180,9 +180,8 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul defer func() { <-semaphoreCh }() // This is a no-op for conversations, as there is no way to detect - // deleted items in a conversation. It might be added in the future - // if graph supports it, so make sure we put up both .data and .meta - // files for deletions. + // deleted items. It might be added in future if graph supports it, + // so make sure we put up both .data and .meta files for deletions. if col.getAndAugment.supportsItemMetadata() { col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix) col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix) diff --git a/src/internal/m365/collection/groups/collection_test.go b/src/internal/m365/collection/groups/collection_test.go index f971715f7..b622c8546 100644 --- a/src/internal/m365/collection/groups/collection_test.go +++ b/src/internal/m365/collection/groups/collection_test.go @@ -185,7 +185,8 @@ func (getAndAugmentChannelMessage) supportsItemMetadata() bool { return false } -func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() { +// Test prefetch streamItems() for channel messages. +func (suite *CollectionUnitSuite) TestPrefetchCollection_ChannelMessages() { var ( t = suite.T() start = time.Now().Add(-1 * time.Second) @@ -264,6 +265,11 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() { go col.streamItems(ctx, errs) for item := range col.stream { + // Make sure item IDs don't have .data and .meta suffixes. + // Those are only meant for conversations. + assert.False(t, strings.HasSuffix(item.ID(), ".data")) + assert.False(t, strings.HasSuffix(item.ID(), ".meta")) + itemCount++ _, aok := test.added[item.ID()] @@ -292,6 +298,161 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() { } } +// Test prefetch streamItems() for conversations. +func (suite *CollectionUnitSuite) TestPrefetchCollection_Conversations() { + var ( + t = suite.T() + start = time.Now().Add(-1 * time.Second) + statusUpdater = func(*support.ControllerOperationStatus) {} + ) + + fullPath, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "f", "s") + require.NoError(t, err, clues.ToCore(err)) + + locPath, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "f", "s") + require.NoError(t, err, clues.ToCore(err)) + + table := []struct { + name string + added map[string]time.Time + removed map[string]struct{} + }{ + { + name: "no items", + }, + { + name: "only added items", + added: map[string]time.Time{ + "fisher": {}, + "flannigan": {}, + "fitzbog": {}, + }, + }, + { + name: "only removed items", + removed: map[string]struct{}{ + "princess": {}, + "poppy": {}, + "petunia": {}, + }, + }, + { + name: "added and removed items", + added: map[string]time.Time{ + "goblin": {}, + }, + removed: map[string]struct{}{ + "general": {}, + "goose": {}, + "grumbles": {}, + }, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + var ( + t = suite.T() + errs = fault.New(true) + itemCount int + itemMap = map[string]data.Item{} + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + col := &prefetchCollection[models.Conversationable, models.Postable]{ + BaseCollection: data.NewBaseCollection( + fullPath, + nil, + locPath.ToBuilder(), + control.DefaultOptions(), + false, + count.New()), + added: test.added, + contains: container[models.Conversationable]{}, + removed: test.removed, + getAndAugment: &getAndAugmentConversation{}, + stream: make(chan data.Item), + statusUpdater: statusUpdater, + } + + go col.streamItems(ctx, errs) + + for item := range col.stream { + var trimmedID string + + switch { + case strings.HasSuffix(item.ID(), ".data"): + trimmedID = strings.TrimSuffix(item.ID(), ".data") + case strings.HasSuffix(item.ID(), ".meta"): + trimmedID = strings.TrimSuffix(item.ID(), ".meta") + default: + assert.Fail(t, "unexpected item suffix: %s", item.ID()) + } + + itemCount++ + itemMap[item.ID()] = item + + _, aok := test.added[trimmedID] + if aok { + assert.False(t, item.Deleted(), "additions should not be marked as deleted") + } + + _, rok := test.removed[trimmedID] + if rok { + assert.True(t, item.Deleted(), "removals should be marked as deleted") + dimt, ok := item.(data.ItemModTime) + require.True(t, ok, "item implements data.ItemModTime") + assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()") + } + + assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID()) + } + + assert.NoError(t, errs.Failure()) + assert.Equal( + t, + 2*(len(test.added)+len(test.removed)), + itemCount, + "should see all expected items") + + addedAndRemoved := append(maps.Keys(test.added), maps.Keys(test.removed)...) + for _, id := range addedAndRemoved { + // Should have a .data and a .meta file + d, ok := itemMap[id+".data"] + assert.True(t, ok, "should have data file for %q", id) + + m, ok := itemMap[id+".meta"] + assert.True(t, ok, "should have meta file for %q", id) + + // Meta files should not have item info. + assert.Implements(t, (*data.Item)(nil), m) + + if slices.Contains(maps.Keys(test.removed), id) { + continue + } + + // Mod times should match. Not doing this check for removed items + // since mod time is set to now() for them. + assert.Equal( + t, + d.(data.ItemModTime).ModTime(), + m.(data.ItemModTime).ModTime(), + "item mod time") + + // Read meta file data. The data is of no significance, we just want + // to make sure the file is readable. + r := m.ToReader() + + _, err := io.ReadAll(r) + assert.NoError(t, err, clues.ToCore(err)) + + r.Close() + } + }) + } +} + type getAndAugmentConversation struct { GetItemErr error GetMetaErr error @@ -341,6 +502,7 @@ func (m *getAndAugmentConversation) check(t *testing.T, expected []string) { assert.Equal(t, expected, m.CallIDs, "expected calls") } +// Test lazy streamItems() for conversations. func (suite *CollectionUnitSuite) TestLazyFetchCollection_Conversations() { var ( t = suite.T()