From 647f7326d751eaf9f23d3536b6bf4b5c8d9ccdc8 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Mon, 18 Sep 2023 14:49:53 -0700 Subject: [PATCH] Refactor exchange collection (#4283) No real changes to API or internal structure, but pull out more generic functionality like getting the path/location info for an exchange collection into a separate struct and factor out some soon-to-be common functions --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [x] :broom: Tech Debt/Cleanup #### Issue(s) * #2023 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- .../m365/collection/exchange/backup.go | 28 +- .../m365/collection/exchange/collection.go | 271 ++++++++++-------- .../collection/exchange/collection_test.go | 45 +-- 3 files changed, 197 insertions(+), 147 deletions(-) diff --git a/src/internal/m365/collection/exchange/backup.go b/src/internal/m365/collection/exchange/backup.go index d0b565ad2..4321a9527 100644 --- a/src/internal/m365/collection/exchange/backup.go +++ b/src/internal/m365/collection/exchange/backup.go @@ -189,15 +189,15 @@ func populateCollections( } edc := NewCollection( + NewBaseCollection( + currPath, + prevPath, + locPath, + ctrlOpts, + newDelta.Reset), qp.ProtectedResource.ID(), - currPath, - prevPath, - locPath, - category, bh.itemHandler(), - statusUpdater, - ctrlOpts, - newDelta.Reset) + statusUpdater) collections[cID] = &edc @@ -251,15 +251,15 @@ func populateCollections( } edc := NewCollection( + NewBaseCollection( + nil, // marks the collection as deleted + prevPath, + nil, // tombstones don't need a location + ctrlOpts, + false), qp.ProtectedResource.ID(), - nil, // marks the collection as deleted - prevPath, - nil, // tombstones don't need a location - category, bh.itemHandler(), - statusUpdater, - ctrlOpts, - false) + statusUpdater) collections[id] = &edc } diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index cd9587802..7a7d2455b 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -36,22 +36,29 @@ const ( numberOfRetries = 4 ) -// Collection implements the interface from data.Collection -// Structure holds data for an Exchange application for a single user -type Collection struct { - user string - stream chan data.Item +func NewBaseCollection( + curr, prev path.Path, + location *path.Builder, + ctrlOpts control.Options, + doNotMergeItems bool, +) baseCollection { + return baseCollection{ + ctrl: ctrlOpts, + doNotMergeItems: doNotMergeItems, + fullPath: curr, + locationPath: location, + prevPath: prev, + state: data.StateOf(prev, curr), + } +} - // added is a list of existing item IDs that were added to a container - added map[string]struct{} - // removed is a list of item IDs that were deleted from, or moved out, of a container - removed map[string]struct{} - - getter itemGetterSerializer - - category path.CategoryType - statusUpdater support.StatusUpdater - ctrl control.Options +// baseCollection contains basic functionality like returning path, location, +// and state information. It can be embedded in other implementations to provide +// this functionality. +// +// Functionality like how items are fetched is left to the embedding struct. +type baseCollection struct { + ctrl control.Options // FullPath is the current hierarchical path used by this collection. fullPath path.Path @@ -71,6 +78,92 @@ type Collection struct { doNotMergeItems bool } +// FullPath returns the baseCollection's fullPath []string +func (col *baseCollection) FullPath() path.Path { + return col.fullPath +} + +// LocationPath produces the baseCollection's full path, but with display names +// instead of IDs in the folders. Only populated for Calendars. +func (col *baseCollection) LocationPath() *path.Builder { + return col.locationPath +} + +func (col baseCollection) PreviousPath() path.Path { + return col.prevPath +} + +func (col baseCollection) State() data.CollectionState { + return col.state +} + +func (col baseCollection) DoNotMergeItems() bool { + return col.doNotMergeItems +} + +// updateStatus is a utility function used to send the status update through +// the channel. +func updateStatus( + ctx context.Context, + statusUpdater support.StatusUpdater, + attempted int, + success int, + totalBytes int64, + folderPath string, + err error, +) { + status := support.CreateStatus( + ctx, + support.Backup, + 1, + support.CollectionMetrics{ + Objects: attempted, + Successes: success, + Bytes: totalBytes, + }, + folderPath) + + logger.Ctx(ctx).Debugw("done streaming items", "status", status.String()) + + statusUpdater(status) +} + +func getItemAndInfo( + ctx context.Context, + getter itemGetterSerializer, + userID string, + id string, + useImmutableIDs bool, + parentPath string, +) ([]byte, *details.ExchangeInfo, error) { + item, info, err := getter.GetItem( + ctx, + userID, + id, + useImmutableIDs, + fault.New(true)) // temporary way to force a failFast error + if err != nil { + return nil, nil, clues.Wrap(err, "fetching item"). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation) + } + + itemData, err := getter.Serialize(ctx, item, userID, id) + if err != nil { + return nil, nil, clues.Wrap(err, "serializing item").WithClues(ctx) + } + + // In case of mail the size of itemData is calc as- size of body content+size of attachment + // in all other case the size is - total item's serialized size + if info.Size <= 0 { + info.Size = int64(len(itemData)) + } + + info.ParentPath = parentPath + + return itemData, info, nil +} + // NewExchangeDataCollection creates an ExchangeDataCollection. // State of the collection is set as an observation of the current // and previous paths. If the curr path is nil, the state is assumed @@ -78,73 +171,56 @@ type Collection struct { // If both are populated, then state is either moved (if they differ), // or notMoved (if they match). func NewCollection( + bc baseCollection, user string, - curr, prev path.Path, - location *path.Builder, - category path.CategoryType, items itemGetterSerializer, statusUpdater support.StatusUpdater, - ctrlOpts control.Options, - doNotMergeItems bool, ) Collection { collection := Collection{ - added: make(map[string]struct{}, 0), - category: category, - ctrl: ctrlOpts, - stream: make(chan data.Item, collectionChannelBufferSize), - doNotMergeItems: doNotMergeItems, - fullPath: curr, - getter: items, - locationPath: location, - prevPath: prev, - removed: make(map[string]struct{}, 0), - state: data.StateOf(prev, curr), - statusUpdater: statusUpdater, - user: user, + baseCollection: bc, + user: user, + added: map[string]struct{}{}, + removed: map[string]struct{}{}, + getter: items, + statusUpdater: statusUpdater, } return collection } +// Collection implements the interface from data.Collection +// Structure holds data for an Exchange application for a single user +type Collection struct { + baseCollection + + user string + + // added is a list of existing item IDs that were added to a container + added map[string]struct{} + // removed is a list of item IDs that were deleted from, or moved out, of a container + removed map[string]struct{} + + getter itemGetterSerializer + + statusUpdater support.StatusUpdater +} + // Items utility function to asynchronously execute process to fill data channel with // M365 exchange objects and returns the data channel func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { - go col.streamItems(ctx, errs) - return col.stream -} + stream := make(chan data.Item, collectionChannelBufferSize) + go col.streamItems(ctx, stream, errs) -// FullPath returns the Collection's fullPath []string -func (col *Collection) FullPath() path.Path { - return col.fullPath + return stream } -// LocationPath produces the Collection's full path, but with display names -// instead of IDs in the folders. Only populated for Calendars. -func (col *Collection) LocationPath() *path.Builder { - return col.locationPath -} - -// TODO(ashmrtn): Fill in with previous path once the Controller compares old -// and new folder hierarchies. -func (col Collection) PreviousPath() path.Path { - return col.prevPath -} - -func (col Collection) State() data.CollectionState { - return col.state -} - -func (col Collection) DoNotMergeItems() bool { - return col.doNotMergeItems -} - -// --------------------------------------------------------------------------- -// Items() channel controller -// --------------------------------------------------------------------------- - // streamItems is a utility function that uses col.collectionType to be able to serialize // all the M365IDs defined in the added field. data channel is closed by this function -func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { +func (col *Collection) streamItems( + ctx context.Context, + stream chan<- data.Item, + errs *fault.Bus, +) { var ( success int64 totalBytes int64 @@ -154,11 +230,19 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { user = col.user log = logger.Ctx(ctx).With( "service", path.ExchangeService.String(), - "category", col.category.String()) + "category", col.FullPath().Category().String()) ) defer func() { - col.finishPopulation(ctx, int(success), totalBytes, errs.Failure()) + close(stream) + updateStatus( + ctx, + col.statusUpdater, + len(col.added)+len(col.removed), + int(success), + totalBytes, + col.FullPath().Folder(false), + errs.Failure()) }() if len(col.added)+len(col.removed) > 0 { @@ -182,14 +266,13 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { defer wg.Done() defer func() { <-semaphoreCh }() - col.stream <- &Item{ + stream <- &Item{ id: id, modTime: time.Now().UTC(), // removed items have no modTime entry. deleted: true, } atomic.AddInt64(&success, 1) - atomic.AddInt64(&totalBytes, 0) if colProgress != nil { colProgress <- struct{}{} @@ -197,6 +280,8 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { }(id) } + parentPath := col.LocationPath().String() + // add any new items for id := range col.added { if errs.Failure() != nil { @@ -211,12 +296,13 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { defer wg.Done() defer func() { <-semaphoreCh }() - item, info, err := col.getter.GetItem( + itemData, info, err := getItemAndInfo( ctx, + col.getter, user, id, col.ctrl.ToggleFeatures.ExchangeImmutableIDs, - fault.New(true)) // temporary way to force a failFast error + parentPath) if err != nil { // Don't report errors for deleted items as there's no way for us to // back up data that is gone. Record it as a "success", since there's @@ -232,23 +318,9 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { return } - data, err := col.getter.Serialize(ctx, item, user, id) - if err != nil { - errs.AddRecoverable(ctx, clues.Wrap(err, "serializing item").Label(fault.LabelForceNoBackupCreation)) - return - } - - // In case of mail the size of data is calc as- size of body content+size of attachment - // in all other case the size is - total item's serialized size - if info.Size <= 0 { - info.Size = int64(len(data)) - } - - info.ParentPath = col.LocationPath().String() - - col.stream <- &Item{ + stream <- &Item{ id: id, - message: data, + message: itemData, info: info, modTime: info.Modified, } @@ -265,33 +337,6 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { wg.Wait() } -// finishPopulation is a utility function used to close a Collection's data channel -// and to send the status update through the channel. -func (col *Collection) finishPopulation( - ctx context.Context, - success int, - totalBytes int64, - err error, -) { - close(col.stream) - - attempted := len(col.added) + len(col.removed) - status := support.CreateStatus( - ctx, - support.Backup, - 1, - support.CollectionMetrics{ - Objects: attempted, - Successes: success, - Bytes: totalBytes, - }, - col.FullPath().Folder(false)) - - logger.Ctx(ctx).Debugw("done streaming items", "status", status.String()) - - col.statusUpdater(status) -} - // Item represents a single item retrieved from exchange type Item struct { id string diff --git a/src/internal/m365/collection/exchange/collection_test.go b/src/internal/m365/collection/exchange/collection_test.go index ed15bfe3e..407096600 100644 --- a/src/internal/m365/collection/exchange/collection_test.go +++ b/src/internal/m365/collection/exchange/collection_test.go @@ -72,8 +72,10 @@ func (suite *CollectionUnitSuite) TestCollection_NewCollection() { require.NoError(t, err, clues.ToCore(err)) edc := Collection{ - user: name, - fullPath: fullPath, + baseCollection: baseCollection{ + fullPath: fullPath, + }, + user: name, } assert.Equal(t, name, edc.user) assert.Equal(t, fullPath, edc.FullPath()) @@ -125,13 +127,15 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() { t := suite.T() c := NewCollection( + NewBaseCollection( + test.curr, + test.prev, + test.loc, + control.DefaultOptions(), + false), "u", - test.curr, test.prev, test.loc, - 0, mock.DefaultItemGetSerialize(), - nil, - control.DefaultOptions(), - false) + nil) assert.Equal(t, test.expect, c.State(), "collection state") assert.Equal(t, test.curr, c.fullPath, "full path") assert.Equal(t, test.prev, c.prevPath, "prev path") @@ -191,7 +195,7 @@ func (suite *CollectionUnitSuite) TestGetItemWithRetries() { func (suite *CollectionUnitSuite) TestCollection_streamItems() { var ( t = suite.T() - start = time.Now().Add(-1 * time.Second) + start = time.Now().Add(-time.Second) statusUpdater = func(*support.ControllerOperationStatus) {} ) @@ -250,20 +254,21 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() { ctx, flush := tester.NewContext(t) defer flush() - col := &Collection{ - added: test.added, - removed: test.removed, - ctrl: control.DefaultOptions(), - getter: &mock.ItemGetSerialize{}, - stream: make(chan data.Item), - fullPath: fullPath, - locationPath: locPath.ToBuilder(), - statusUpdater: statusUpdater, - } + col := NewCollection( + NewBaseCollection( + fullPath, + nil, + locPath.ToBuilder(), + control.DefaultOptions(), + false), + "", + &mock.ItemGetSerialize{}, + statusUpdater) - go col.streamItems(ctx, errs) + col.added = test.added + col.removed = test.removed - for item := range col.stream { + for item := range col.Items(ctx, errs) { itemCount++ _, aok := test.added[item.ID()]