Refactor exchange collection (#4283)

No real changes to API or internal structure,
but pull out more generic functionality like
getting the path/location info for an
exchange collection into a separate struct
and factor out some soon-to-be common
functions

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #2023

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-09-18 14:49:53 -07:00 committed by GitHub
parent 8f7070ffac
commit 647f7326d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 197 additions and 147 deletions

View File

@ -189,15 +189,15 @@ func populateCollections(
}
edc := NewCollection(
NewBaseCollection(
currPath,
prevPath,
locPath,
ctrlOpts,
newDelta.Reset),
qp.ProtectedResource.ID(),
currPath,
prevPath,
locPath,
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
newDelta.Reset)
statusUpdater)
collections[cID] = &edc
@ -251,15 +251,15 @@ func populateCollections(
}
edc := NewCollection(
NewBaseCollection(
nil, // marks the collection as deleted
prevPath,
nil, // tombstones don't need a location
ctrlOpts,
false),
qp.ProtectedResource.ID(),
nil, // marks the collection as deleted
prevPath,
nil, // tombstones don't need a location
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
false)
statusUpdater)
collections[id] = &edc
}

View File

@ -36,22 +36,29 @@ const (
numberOfRetries = 4
)
// Collection implements the interface from data.Collection
// Structure holds data for an Exchange application for a single user
type Collection struct {
user string
stream chan data.Item
func NewBaseCollection(
curr, prev path.Path,
location *path.Builder,
ctrlOpts control.Options,
doNotMergeItems bool,
) baseCollection {
return baseCollection{
ctrl: ctrlOpts,
doNotMergeItems: doNotMergeItems,
fullPath: curr,
locationPath: location,
prevPath: prev,
state: data.StateOf(prev, curr),
}
}
// added is a list of existing item IDs that were added to a container
added map[string]struct{}
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{}
getter itemGetterSerializer
category path.CategoryType
statusUpdater support.StatusUpdater
ctrl control.Options
// baseCollection contains basic functionality like returning path, location,
// and state information. It can be embedded in other implementations to provide
// this functionality.
//
// Functionality like how items are fetched is left to the embedding struct.
type baseCollection struct {
ctrl control.Options
// FullPath is the current hierarchical path used by this collection.
fullPath path.Path
@ -71,6 +78,92 @@ type Collection struct {
doNotMergeItems bool
}
// FullPath returns the baseCollection's fullPath []string
func (col *baseCollection) FullPath() path.Path {
return col.fullPath
}
// LocationPath produces the baseCollection's full path, but with display names
// instead of IDs in the folders. Only populated for Calendars.
func (col *baseCollection) LocationPath() *path.Builder {
return col.locationPath
}
func (col baseCollection) PreviousPath() path.Path {
return col.prevPath
}
func (col baseCollection) State() data.CollectionState {
return col.state
}
func (col baseCollection) DoNotMergeItems() bool {
return col.doNotMergeItems
}
// updateStatus is a utility function used to send the status update through
// the channel.
func updateStatus(
ctx context.Context,
statusUpdater support.StatusUpdater,
attempted int,
success int,
totalBytes int64,
folderPath string,
err error,
) {
status := support.CreateStatus(
ctx,
support.Backup,
1,
support.CollectionMetrics{
Objects: attempted,
Successes: success,
Bytes: totalBytes,
},
folderPath)
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
statusUpdater(status)
}
func getItemAndInfo(
ctx context.Context,
getter itemGetterSerializer,
userID string,
id string,
useImmutableIDs bool,
parentPath string,
) ([]byte, *details.ExchangeInfo, error) {
item, info, err := getter.GetItem(
ctx,
userID,
id,
useImmutableIDs,
fault.New(true)) // temporary way to force a failFast error
if err != nil {
return nil, nil, clues.Wrap(err, "fetching item").
WithClues(ctx).
Label(fault.LabelForceNoBackupCreation)
}
itemData, err := getter.Serialize(ctx, item, userID, id)
if err != nil {
return nil, nil, clues.Wrap(err, "serializing item").WithClues(ctx)
}
// In case of mail the size of itemData is calc as- size of body content+size of attachment
// in all other case the size is - total item's serialized size
if info.Size <= 0 {
info.Size = int64(len(itemData))
}
info.ParentPath = parentPath
return itemData, info, nil
}
// NewExchangeDataCollection creates an ExchangeDataCollection.
// State of the collection is set as an observation of the current
// and previous paths. If the curr path is nil, the state is assumed
@ -78,73 +171,56 @@ type Collection struct {
// If both are populated, then state is either moved (if they differ),
// or notMoved (if they match).
func NewCollection(
bc baseCollection,
user string,
curr, prev path.Path,
location *path.Builder,
category path.CategoryType,
items itemGetterSerializer,
statusUpdater support.StatusUpdater,
ctrlOpts control.Options,
doNotMergeItems bool,
) Collection {
collection := Collection{
added: make(map[string]struct{}, 0),
category: category,
ctrl: ctrlOpts,
stream: make(chan data.Item, collectionChannelBufferSize),
doNotMergeItems: doNotMergeItems,
fullPath: curr,
getter: items,
locationPath: location,
prevPath: prev,
removed: make(map[string]struct{}, 0),
state: data.StateOf(prev, curr),
statusUpdater: statusUpdater,
user: user,
baseCollection: bc,
user: user,
added: map[string]struct{}{},
removed: map[string]struct{}{},
getter: items,
statusUpdater: statusUpdater,
}
return collection
}
// Collection implements the interface from data.Collection
// Structure holds data for an Exchange application for a single user
type Collection struct {
baseCollection
user string
// added is a list of existing item IDs that were added to a container
added map[string]struct{}
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{}
getter itemGetterSerializer
statusUpdater support.StatusUpdater
}
// Items utility function to asynchronously execute process to fill data channel with
// M365 exchange objects and returns the data channel
func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
go col.streamItems(ctx, errs)
return col.stream
}
stream := make(chan data.Item, collectionChannelBufferSize)
go col.streamItems(ctx, stream, errs)
// FullPath returns the Collection's fullPath []string
func (col *Collection) FullPath() path.Path {
return col.fullPath
return stream
}
// LocationPath produces the Collection's full path, but with display names
// instead of IDs in the folders. Only populated for Calendars.
func (col *Collection) LocationPath() *path.Builder {
return col.locationPath
}
// TODO(ashmrtn): Fill in with previous path once the Controller compares old
// and new folder hierarchies.
func (col Collection) PreviousPath() path.Path {
return col.prevPath
}
func (col Collection) State() data.CollectionState {
return col.state
}
func (col Collection) DoNotMergeItems() bool {
return col.doNotMergeItems
}
// ---------------------------------------------------------------------------
// Items() channel controller
// ---------------------------------------------------------------------------
// streamItems is a utility function that uses col.collectionType to be able to serialize
// all the M365IDs defined in the added field. data channel is closed by this function
func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
func (col *Collection) streamItems(
ctx context.Context,
stream chan<- data.Item,
errs *fault.Bus,
) {
var (
success int64
totalBytes int64
@ -154,11 +230,19 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
user = col.user
log = logger.Ctx(ctx).With(
"service", path.ExchangeService.String(),
"category", col.category.String())
"category", col.FullPath().Category().String())
)
defer func() {
col.finishPopulation(ctx, int(success), totalBytes, errs.Failure())
close(stream)
updateStatus(
ctx,
col.statusUpdater,
len(col.added)+len(col.removed),
int(success),
totalBytes,
col.FullPath().Folder(false),
errs.Failure())
}()
if len(col.added)+len(col.removed) > 0 {
@ -182,14 +266,13 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
defer wg.Done()
defer func() { <-semaphoreCh }()
col.stream <- &Item{
stream <- &Item{
id: id,
modTime: time.Now().UTC(), // removed items have no modTime entry.
deleted: true,
}
atomic.AddInt64(&success, 1)
atomic.AddInt64(&totalBytes, 0)
if colProgress != nil {
colProgress <- struct{}{}
@ -197,6 +280,8 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
}(id)
}
parentPath := col.LocationPath().String()
// add any new items
for id := range col.added {
if errs.Failure() != nil {
@ -211,12 +296,13 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
defer wg.Done()
defer func() { <-semaphoreCh }()
item, info, err := col.getter.GetItem(
itemData, info, err := getItemAndInfo(
ctx,
col.getter,
user,
id,
col.ctrl.ToggleFeatures.ExchangeImmutableIDs,
fault.New(true)) // temporary way to force a failFast error
parentPath)
if err != nil {
// Don't report errors for deleted items as there's no way for us to
// back up data that is gone. Record it as a "success", since there's
@ -232,23 +318,9 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
return
}
data, err := col.getter.Serialize(ctx, item, user, id)
if err != nil {
errs.AddRecoverable(ctx, clues.Wrap(err, "serializing item").Label(fault.LabelForceNoBackupCreation))
return
}
// In case of mail the size of data is calc as- size of body content+size of attachment
// in all other case the size is - total item's serialized size
if info.Size <= 0 {
info.Size = int64(len(data))
}
info.ParentPath = col.LocationPath().String()
col.stream <- &Item{
stream <- &Item{
id: id,
message: data,
message: itemData,
info: info,
modTime: info.Modified,
}
@ -265,33 +337,6 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
wg.Wait()
}
// finishPopulation is a utility function used to close a Collection's data channel
// and to send the status update through the channel.
func (col *Collection) finishPopulation(
ctx context.Context,
success int,
totalBytes int64,
err error,
) {
close(col.stream)
attempted := len(col.added) + len(col.removed)
status := support.CreateStatus(
ctx,
support.Backup,
1,
support.CollectionMetrics{
Objects: attempted,
Successes: success,
Bytes: totalBytes,
},
col.FullPath().Folder(false))
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
col.statusUpdater(status)
}
// Item represents a single item retrieved from exchange
type Item struct {
id string

View File

@ -72,8 +72,10 @@ func (suite *CollectionUnitSuite) TestCollection_NewCollection() {
require.NoError(t, err, clues.ToCore(err))
edc := Collection{
user: name,
fullPath: fullPath,
baseCollection: baseCollection{
fullPath: fullPath,
},
user: name,
}
assert.Equal(t, name, edc.user)
assert.Equal(t, fullPath, edc.FullPath())
@ -125,13 +127,15 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
t := suite.T()
c := NewCollection(
NewBaseCollection(
test.curr,
test.prev,
test.loc,
control.DefaultOptions(),
false),
"u",
test.curr, test.prev, test.loc,
0,
mock.DefaultItemGetSerialize(),
nil,
control.DefaultOptions(),
false)
nil)
assert.Equal(t, test.expect, c.State(), "collection state")
assert.Equal(t, test.curr, c.fullPath, "full path")
assert.Equal(t, test.prev, c.prevPath, "prev path")
@ -191,7 +195,7 @@ func (suite *CollectionUnitSuite) TestGetItemWithRetries() {
func (suite *CollectionUnitSuite) TestCollection_streamItems() {
var (
t = suite.T()
start = time.Now().Add(-1 * time.Second)
start = time.Now().Add(-time.Second)
statusUpdater = func(*support.ControllerOperationStatus) {}
)
@ -250,20 +254,21 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() {
ctx, flush := tester.NewContext(t)
defer flush()
col := &Collection{
added: test.added,
removed: test.removed,
ctrl: control.DefaultOptions(),
getter: &mock.ItemGetSerialize{},
stream: make(chan data.Item),
fullPath: fullPath,
locationPath: locPath.ToBuilder(),
statusUpdater: statusUpdater,
}
col := NewCollection(
NewBaseCollection(
fullPath,
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false),
"",
&mock.ItemGetSerialize{},
statusUpdater)
go col.streamItems(ctx, errs)
col.added = test.added
col.removed = test.removed
for item := range col.stream {
for item := range col.Items(ctx, errs) {
itemCount++
_, aok := test.added[item.ID()]