From 127570953a58a53d69e3cf4331a1c20183803227 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Fri, 22 Dec 2023 15:18:58 -0800 Subject: [PATCH] Add prefetch collection for groups (#4906) Minor refactor before introducing `lazyFetchCollection` for groups. We'll utilize `lazyFetchCollection` for group mailboxes and will continue to use `prefetchCollection` for channels. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [x] :broom: Tech Debt/Cleanup #### Issue(s) * https://github.com/alcionai/corso/issues/4862 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/m365/collection/groups/backup.go | 8 +++---- .../m365/collection/groups/collection.go | 21 +++++++++--------- .../m365/collection/groups/collection_test.go | 22 +++++++++---------- 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go index f5f283ac3..a739cd895 100644 --- a/src/internal/m365/collection/groups/backup.go +++ b/src/internal/m365/collection/groups/backup.go @@ -4,7 +4,6 @@ import ( "context" "github.com/alcionai/clues" - "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/pii" "github.com/alcionai/corso/src/internal/common/ptr" @@ -174,10 +173,9 @@ func populateCollections[C graph.GetIDer, I groupsItemer]( continue } - added := str.SliceToMap(maps.Keys(addAndRem.Added)) removed := str.SliceToMap(addAndRem.Removed) - cl.Add(count.ItemsAdded, int64(len(added))) + cl.Add(count.ItemsAdded, int64(len(addAndRem.Added))) cl.Add(count.ItemsRemoved, int64(len(removed))) if len(addAndRem.DU.URL) > 0 { @@ -198,7 +196,7 @@ func populateCollections[C graph.GetIDer, I groupsItemer]( // deleted and then restored will have a different ID than they did // originally. for remove := range removed { - delete(added, remove) + delete(addAndRem.Added, remove) } edc := NewCollection( @@ -211,7 +209,7 @@ func populateCollections[C graph.GetIDer, I groupsItemer]( cl), bh, qp.ProtectedResource.ID(), - added, + addAndRem.Added, removed, c, statusUpdater) diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index a3a72479d..5029f5cf5 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -6,6 +6,7 @@ import ( "io" "sync" "sync/atomic" + "time" "github.com/alcionai/clues" kjson "github.com/microsoft/kiota-serialization-json-go" @@ -20,14 +21,14 @@ import ( "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) -var _ data.BackupCollection = &Collection[graph.GetIDer, groupsItemer]{} +var _ data.BackupCollection = &prefetchCollection[graph.GetIDer, groupsItemer]{} const ( collectionChannelBufferSize = 1000 numberOfRetries = 4 ) -type Collection[C graph.GetIDer, I groupsItemer] struct { +type prefetchCollection[C graph.GetIDer, I groupsItemer] struct { data.BaseCollection protectedResource string stream chan data.Item @@ -35,7 +36,7 @@ type Collection[C graph.GetIDer, I groupsItemer] struct { contains container[C] // added is a list of existing item IDs that were added to a container - added map[string]struct{} + added map[string]time.Time // removed is a list of item IDs that were deleted from, or moved out, of a container removed map[string]struct{} @@ -54,12 +55,12 @@ func NewCollection[C graph.GetIDer, I groupsItemer]( baseCol data.BaseCollection, getAndAugment getItemAndAugmentInfoer[C, I], protectedResource string, - added map[string]struct{}, + added map[string]time.Time, removed map[string]struct{}, contains container[C], statusUpdater support.StatusUpdater, -) Collection[C, I] { - collection := Collection[C, I]{ +) prefetchCollection[C, I] { + collection := prefetchCollection[C, I]{ BaseCollection: baseCol, added: added, contains: contains, @@ -75,7 +76,7 @@ func NewCollection[C graph.GetIDer, I groupsItemer]( // Items utility function to asynchronously execute process to fill data channel with // M365 exchange objects and returns the data channel -func (col *Collection[C, I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { +func (col *prefetchCollection[C, I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { go col.streamItems(ctx, errs) return col.stream } @@ -84,7 +85,7 @@ func (col *Collection[C, I]) Items(ctx context.Context, errs *fault.Bus) <-chan // items() production // --------------------------------------------------------------------------- -func (col *Collection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) { +func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) { var ( streamedItems int64 totalBytes int64 @@ -212,9 +213,9 @@ func (col *Collection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) { wg.Wait() } -// finishPopulation is a utility function used to close a Collection's data channel +// finishPopulation is a utility function used to close a collection's data channel // and to send the status update through the channel. -func (col *Collection[C, I]) finishPopulation( +func (col *prefetchCollection[C, I]) finishPopulation( ctx context.Context, streamedItems, totalBytes int64, err error, diff --git a/src/internal/m365/collection/groups/collection_test.go b/src/internal/m365/collection/groups/collection_test.go index c84724916..0bd298f10 100644 --- a/src/internal/m365/collection/groups/collection_test.go +++ b/src/internal/m365/collection/groups/collection_test.go @@ -162,7 +162,7 @@ func (getAndAugmentChannelMessage) augmentItemInfo(*details.GroupsInfo, models.C // no-op } -func (suite *CollectionUnitSuite) TestCollection_streamItems() { +func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() { var ( t = suite.T() start = time.Now().Add(-1 * time.Second) @@ -177,26 +177,22 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() { table := []struct { name string - added map[string]struct{} + added map[string]time.Time removed map[string]struct{} }{ { - name: "no items", - added: map[string]struct{}{}, - removed: map[string]struct{}{}, + name: "no items", }, { name: "only added items", - added: map[string]struct{}{ + added: map[string]time.Time{ "fisher": {}, "flannigan": {}, "fitzbog": {}, }, - removed: map[string]struct{}{}, }, { - name: "only removed items", - added: map[string]struct{}{}, + name: "only removed items", removed: map[string]struct{}{ "princess": {}, "poppy": {}, @@ -204,8 +200,10 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() { }, }, { - name: "added and removed items", - added: map[string]struct{}{}, + name: "added and removed items", + added: map[string]time.Time{ + "goblin": {}, + }, removed: map[string]struct{}{ "general": {}, "goose": {}, @@ -224,7 +222,7 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() { ctx, flush := tester.NewContext(t) defer flush() - col := &Collection[models.Channelable, models.ChatMessageable]{ + col := &prefetchCollection[models.Channelable, models.ChatMessageable]{ BaseCollection: data.NewBaseCollection( fullPath, nil,