From 94b02ed6f335df608c1bf1db25c7986c48c99724 Mon Sep 17 00:00:00 2001 From: ryanfkeepers Date: Wed, 24 Jan 2024 12:47:38 -0700 Subject: [PATCH] fix item filtering, remove prefetch coll --- .../m365/collection/teamsChats/backup.go | 8 +- .../collection/teamsChats/chat_handler.go | 7 + .../m365/collection/teamsChats/collection.go | 161 +----------------- .../collection/teamsChats/collection_test.go | 78 +-------- 4 files changed, 15 insertions(+), 239 deletions(-) diff --git a/src/internal/m365/collection/teamsChats/backup.go b/src/internal/m365/collection/teamsChats/backup.go index 5908dbc6e..2a464d322 100644 --- a/src/internal/m365/collection/teamsChats/backup.go +++ b/src/internal/m365/collection/teamsChats/backup.go @@ -131,9 +131,10 @@ func populateCollection[I chatsItemer]( for _, item := range items { if !bh.includeItem(item, scope) { cl.Inc(count.SkippedItems) - } else { - includedItems = append(includedItems, item) + continue } + + includedItems = append(includedItems, item) } cl.Add(count.ItemsAdded, int64(len(includedItems))) @@ -159,8 +160,7 @@ func populateCollection[I chatsItemer]( qp.ProtectedResource.ID(), includedItems, container, - statusUpdater, - useLazyReader) + statusUpdater) return collection, clues.Stack(errs.Failure()).OrNil() } diff --git a/src/internal/m365/collection/teamsChats/chat_handler.go b/src/internal/m365/collection/teamsChats/chat_handler.go index a6ad0341a..93ef8316a 100644 --- a/src/internal/m365/collection/teamsChats/chat_handler.go +++ b/src/internal/m365/collection/teamsChats/chat_handler.go @@ -58,6 +58,13 @@ func (bh usersChatsBackupHandler) includeItem( ch models.Chatable, scope selectors.TeamsChatsScope, ) bool { + // corner case: many Topics are empty, and empty inputs are automatically + // set to non-matching in the selectors code. This allows us to include + // everything without needing to check the topic value in that case. + if scope.IsAny(selectors.TeamsChatsChat) { + return true + } + return scope.Matches(selectors.TeamsChatsChat, ptr.Val(ch.GetTopic())) } diff --git a/src/internal/m365/collection/teamsChats/collection.go b/src/internal/m365/collection/teamsChats/collection.go index d9bc75ff1..a7f25c601 100644 --- a/src/internal/m365/collection/teamsChats/collection.go +++ b/src/internal/m365/collection/teamsChats/collection.go @@ -18,7 +18,6 @@ import ( "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" - "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/errs/core" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" @@ -26,10 +25,7 @@ import ( "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) -var ( - _ data.BackupCollection = &prefetchCollection[chatsItemer]{} - _ data.BackupCollection = &lazyFetchCollection[chatsItemer]{} -) +var _ data.BackupCollection = &lazyFetchCollection[chatsItemer]{} const ( collectionChannelBufferSize = 1000 @@ -63,24 +59,6 @@ func updateStatus( statusUpdater(status) } -// ----------------------------------------------------------------------------- -// prefetchCollection -// ----------------------------------------------------------------------------- - -type prefetchCollection[I chatsItemer] struct { - data.BaseCollection - protectedResource string - stream chan data.Item - - contains container[I] - - items []I - - getAndAugment getItemAndAugmentInfoer[I] - - statusUpdater support.StatusUpdater -} - // State of the collection is set as an observation of the current // and previous paths. If the curr path is nil, the state is assumed // to be deleted. If the prev path is nil, it is assumed newly created. @@ -93,21 +71,8 @@ func NewCollection[I chatsItemer]( items []I, contains container[I], statusUpdater support.StatusUpdater, - useLazyReader bool, ) data.BackupCollection { - if useLazyReader { - return &lazyFetchCollection[I]{ - BaseCollection: baseCol, - items: items, - contains: contains, - getAndAugment: getAndAugment, - statusUpdater: statusUpdater, - stream: make(chan data.Item, collectionChannelBufferSize), - protectedResource: protectedResource, - } - } - - return &prefetchCollection[I]{ + return &lazyFetchCollection[I]{ BaseCollection: baseCol, items: items, contains: contains, @@ -118,128 +83,6 @@ func NewCollection[I chatsItemer]( } } -func (col *prefetchCollection[I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { - go col.streamItems(ctx, errs) - return col.stream -} - -func (col *prefetchCollection[I]) streamItems(ctx context.Context, errs *fault.Bus) { - var ( - streamedItems int64 - totalBytes int64 - wg sync.WaitGroup - progressMessage chan<- struct{} - el = errs.Local() - ) - - ctx = clues.Add(ctx, "category", col.Category().String()) - - defer func() { - close(col.stream) - logger.Ctx(ctx).Infow( - "finished stream backup collection items", - "stats", col.Counter.Values()) - - updateStatus( - ctx, - col.statusUpdater, - len(col.items), - streamedItems, - totalBytes, - col.FullPath().Folder(false), - errs.Failure()) - }() - - if len(col.items) > 0 { - progressMessage = observe.CollectionProgress( - ctx, - col.Category().HumanString(), - col.LocationPath().Elements()) - defer close(progressMessage) - } - - semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch) - defer close(semaphoreCh) - - // add any new items - for _, item := range col.items { - if el.Failure() != nil { - break - } - - wg.Add(1) - semaphoreCh <- struct{}{} - - itemID := ptr.Val(item.GetId()) - - go func(id string) { - defer wg.Done() - defer func() { <-semaphoreCh }() - - writer := kjson.NewJsonSerializationWriter() - defer writer.Close() - - item, info, err := col.getAndAugment.getItem( - ctx, - col.protectedResource, - id) - if err != nil { - err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) - - return - } - - col.getAndAugment.augmentItemInfo(info, col.contains.container) - - if err := writer.WriteObjectValue("", item); err != nil { - err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) - - return - } - - itemData, err := writer.GetSerializedContent() - if err != nil { - err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) - - return - } - - info.ParentPath = col.LocationPath().String() - - storeItem, err := data.NewPrefetchedItemWithInfo( - io.NopCloser(bytes.NewReader(itemData)), - id, - details.ItemInfo{TeamsChats: info}) - if err != nil { - err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation) - el.AddRecoverable(ctx, err) - - return - } - - col.stream <- storeItem - - atomic.AddInt64(&streamedItems, 1) - atomic.AddInt64(&totalBytes, int64(info.Chat.MessageCount)) - - if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 { - logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values()) - } - - col.Counter.Add(count.StreamBytesAdded, int64(info.Chat.MessageCount)) - - if progressMessage != nil { - progressMessage <- struct{}{} - } - }(itemID) - } - - wg.Wait() -} - // ----------------------------------------------------------------------------- // lazyFetchCollection // ----------------------------------------------------------------------------- diff --git a/src/internal/m365/collection/teamsChats/collection_test.go b/src/internal/m365/collection/teamsChats/collection_test.go index cda6908a7..15b0dac77 100644 --- a/src/internal/m365/collection/teamsChats/collection_test.go +++ b/src/internal/m365/collection/teamsChats/collection_test.go @@ -133,14 +133,13 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() { "g", nil, container[models.Chatable]{}, - nil, - false) + nil) assert.Equal(t, test.expect, c.State(), "collection state") assert.Equal(t, test.curr, c.FullPath(), "full path") assert.Equal(t, test.prev, c.PreviousPath(), "prev path") - prefetch, ok := c.(*prefetchCollection[models.Chatable]) + prefetch, ok := c.(*lazyFetchCollection[models.Chatable]) require.True(t, ok, "collection type") assert.Equal(t, test.loc, prefetch.LocationPath(), "location path") @@ -170,79 +169,6 @@ func (getAndAugmentChat) augmentItemInfo(*details.TeamsChatsInfo, models.Chatabl // no-op } -func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() { - var ( - t = suite.T() - statusUpdater = func(*support.ControllerOperationStatus) {} - ) - - fullPath, err := path.Build("t", "pr", path.TeamsChatsService, path.ChatsCategory, false, "fnords", "smarf") - require.NoError(t, err, clues.ToCore(err)) - - locPath, err := path.Build("t", "pr", path.TeamsChatsService, path.ChatsCategory, false, "fnords", "smarf") - require.NoError(t, err, clues.ToCore(err)) - - table := []struct { - name string - items []models.Chatable - }{ - { - name: "no items", - }, - { - name: "items", - items: testdata.StubChats("fisher", "flannigan", "fitzbog"), - }, - } - for _, test := range table { - suite.Run(test.name, func() { - var ( - t = suite.T() - errs = fault.New(true) - itemCount int - ) - - ctx, flush := tester.NewContext(t) - defer flush() - - col := &prefetchCollection[models.Chatable]{ - BaseCollection: data.NewBaseCollection( - fullPath, - nil, - locPath.ToBuilder(), - control.DefaultOptions(), - false, - count.New()), - items: test.items, - contains: container[models.Chatable]{}, - getAndAugment: getAndAugmentChat{}, - stream: make(chan data.Item), - statusUpdater: statusUpdater, - } - - go col.streamItems(ctx, errs) - - for item := range col.stream { - itemCount++ - - ok := slices.ContainsFunc(test.items, func(mc models.Chatable) bool { - return ptr.Val(mc.GetId()) == item.ID() - }) - - require.True(t, ok, "item must be either added or removed: %q", item.ID()) - assert.False(t, item.Deleted(), "additions should not be marked as deleted") - } - - assert.NoError(t, errs.Failure()) - assert.Equal( - t, - len(test.items), - itemCount, - "should see all expected items") - }) - } -} - func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() { var ( t = suite.T()