Add lazy item reader for groups (#4913)

<!-- PR description-->

Lazy item reader similar to exchange. Tests are also inspired from exchange.

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* https://github.com/alcionai/corso/issues/4862

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-12-22 18:31:05 -08:00 committed by GitHub
parent 341a838181
commit f5f65f0cfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 554 additions and 7 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
@ -55,6 +56,10 @@ func updateStatus(
statusUpdater(status)
}
// -----------------------------------------------------------------------------
// prefetchCollection
// -----------------------------------------------------------------------------
type prefetchCollection[C graph.GetIDer, I groupsItemer] struct {
data.BaseCollection
protectedResource string
@ -72,7 +77,6 @@ type prefetchCollection[C graph.GetIDer, I groupsItemer] struct {
statusUpdater support.StatusUpdater
}
// NewExchangeDataCollection creates an ExchangeDataCollection.
// State of the collection is set as an observation of the current
// and previous paths. If the curr path is nil, the state is assumed
// to be deleted. If the prev path is nil, it is assumed newly created.
@ -101,17 +105,11 @@ func NewCollection[C graph.GetIDer, I groupsItemer](
return collection
}
// Items utility function to asynchronously execute process to fill data channel with
// M365 exchange objects and returns the data channel
func (col *prefetchCollection[C, I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
go col.streamItems(ctx, errs)
return col.stream
}
// ---------------------------------------------------------------------------
// items() production
// ---------------------------------------------------------------------------
func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) {
var (
streamedItems int64
@ -248,3 +246,203 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
wg.Wait()
}
// -----------------------------------------------------------------------------
// lazyFetchCollection
// -----------------------------------------------------------------------------
type lazyFetchCollection[C graph.GetIDer, I groupsItemer] struct {
data.BaseCollection
protectedResource string
stream chan data.Item
contains container[C]
// added is a list of existing item IDs that were added to a container
added map[string]time.Time
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{}
getAndAugment getItemAndAugmentInfoer[C, I]
statusUpdater support.StatusUpdater
}
func (col *lazyFetchCollection[C, I]) Items(
ctx context.Context,
errs *fault.Bus,
) <-chan data.Item {
go col.streamItems(ctx, errs)
return col.stream
}
func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) {
var (
streamedItems int64
wg sync.WaitGroup
colProgress chan<- struct{}
el = errs.Local()
)
ctx = clues.Add(ctx, "category", col.Category().String())
defer func() {
close(col.stream)
logger.Ctx(ctx).Infow(
"finished stream backup collection items",
"stats", col.Counter.Values())
updateStatus(
ctx,
col.statusUpdater,
len(col.added)+len(col.removed),
streamedItems,
0,
col.FullPath().Folder(false),
errs.Failure())
}()
if len(col.added)+len(col.removed) > 0 {
colProgress = observe.CollectionProgress(
ctx,
col.Category().HumanString(),
col.LocationPath().Elements())
defer close(colProgress)
}
semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch)
defer close(semaphoreCh)
// delete all removed items
for id := range col.removed {
semaphoreCh <- struct{}{}
wg.Add(1)
go func(id string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
col.stream <- data.NewDeletedItem(id)
atomic.AddInt64(&streamedItems, 1)
col.Counter.Inc(count.StreamItemsRemoved)
if colProgress != nil {
colProgress <- struct{}{}
}
}(id)
}
// add any new items
for id, modTime := range col.added {
if el.Failure() != nil {
break
}
wg.Add(1)
semaphoreCh <- struct{}{}
go func(id string, modTime time.Time) {
defer wg.Done()
defer func() { <-semaphoreCh }()
ictx := clues.Add(
ctx,
"item_id", id,
"parent_path", path.LoggableDir(col.LocationPath().String()))
col.stream <- data.NewLazyItemWithInfo(
ictx,
&lazyItemGetter[C, I]{
modTime: modTime,
getAndAugment: col.getAndAugment,
resourceID: col.protectedResource,
itemID: id,
containerIDs: col.FullPath().Folders(),
contains: col.contains,
parentPath: col.LocationPath().String(),
},
id,
modTime,
col.Counter,
el)
atomic.AddInt64(&streamedItems, 1)
if colProgress != nil {
colProgress <- struct{}{}
}
}(id, modTime)
}
wg.Wait()
}
type lazyItemGetter[C graph.GetIDer, I groupsItemer] struct {
getAndAugment getItemAndAugmentInfoer[C, I]
resourceID string
itemID string
parentPath string
containerIDs path.Elements
modTime time.Time
contains container[C]
}
func (lig *lazyItemGetter[C, I]) GetData(
ctx context.Context,
errs *fault.Bus,
) (io.ReadCloser, *details.ItemInfo, bool, error) {
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
item, info, err := lig.getAndAugment.getItem(
ctx,
lig.resourceID,
lig.containerIDs,
lig.itemID)
if err != nil {
// If an item was deleted then return an empty file so we don't fail
// the backup. Also return delInFlight as true so that kopia skips
// adding ItemInfo to details.
//
// The item will be deleted from kopia on the next backup when the
// delta token shows it's removed.
if graph.IsErrDeletedInFlight(err) {
logger.CtxErr(ctx, err).Info("item not found")
return nil, nil, true, nil
}
err = clues.WrapWC(ctx, err, "getting item data").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
lig.getAndAugment.augmentItemInfo(info, lig.contains.container)
if err := writer.WriteObjectValue("", item); err != nil {
err = clues.WrapWC(ctx, err, "writing item to serializer").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
itemData, err := writer.GetSerializedContent()
if err != nil {
err = clues.WrapWC(ctx, err, "serializing item").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
info.ParentPath = lig.parentPath
// Update the mod time to what we already told kopia about. This is required
// for proper details merging.
info.Modified = lig.modTime
return io.NopCloser(bytes.NewReader(itemData)),
&details.ItemInfo{Groups: info},
false,
nil
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io"
"slices"
"testing"
"time"
@ -23,6 +24,7 @@ import (
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
type CollectionUnitSuite struct {
@ -268,3 +270,350 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
})
}
}
type getAndAugmentConversation struct {
GetItemErr error
CallIDs []string
}
//lint:ignore U1000 false linter issue due to generics
func (m *getAndAugmentConversation) getItem(
_ context.Context,
_ string,
_ path.Elements,
postID string,
) (models.Postable, *details.GroupsInfo, error) {
m.CallIDs = append(m.CallIDs, postID)
p := models.NewPost()
p.SetId(ptr.To(postID))
return p, &details.GroupsInfo{}, m.GetItemErr
}
//
//lint:ignore U1000 false linter issue due to generics
func (m *getAndAugmentConversation) augmentItemInfo(*details.GroupsInfo, models.Conversationable) {
// no-op
}
func (m *getAndAugmentConversation) check(t *testing.T, expected []string) {
// Sort before comparing. We could use a set, but that would prevent us from
// detecting duplicates.
slices.Sort(m.CallIDs)
slices.Sort(expected)
assert.Equal(t, expected, m.CallIDs, "expected calls")
}
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
var (
t = suite.T()
start = time.Now().Add(-time.Second)
statusUpdater = func(*support.ControllerOperationStatus) {}
)
fullPath, err := path.Build(
"t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
locPath, err := path.Build(
"t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
table := []struct {
name string
added map[string]time.Time
removed map[string]struct{}
expectItemCount int
// Items we want to trigger lazy reader on.
expectReads []string
}{
{
name: "no items",
},
{
name: "only added items",
added: map[string]time.Time{
"fisher": start.Add(time.Minute),
"flannigan": start.Add(2 * time.Minute),
"fitzbog": start.Add(3 * time.Minute),
},
expectItemCount: 3,
expectReads: []string{
"fisher",
"flannigan",
"fitzbog",
},
},
{
name: "only removed items",
removed: map[string]struct{}{
"princess": {},
"poppy": {},
"petunia": {},
},
expectItemCount: 3,
},
{
// TODO(pandeyabs): Overlaps between added and removed are deleted
// by NewCollection caller code. This is a slight deviation from how
// exchange does it. It's harmless but should be fixed for consistency.
//
// Since we are calling NewCollection here directly, we are not testing
// with overlaps, else those tests with fail. Same behavior exists for
// prefetch collections.
name: "added and removed items",
added: map[string]time.Time{
"goblin": {},
},
removed: map[string]struct{}{
"general": {},
"goose": {},
"grumbles": {},
},
expectItemCount: 4,
},
}
for _, test := range table {
suite.Run(test.name, func() {
var (
t = suite.T()
errs = fault.New(true)
itemCount int
)
ctx, flush := tester.NewContext(t)
defer flush()
getterAugmenter := &getAndAugmentConversation{}
defer getterAugmenter.check(t, test.expectReads)
col := &lazyFetchCollection[models.Conversationable, models.Postable]{
BaseCollection: data.NewBaseCollection(
fullPath,
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false,
count.New()),
added: test.added,
contains: container[models.Conversationable]{},
removed: test.removed,
getAndAugment: getterAugmenter,
stream: make(chan data.Item),
statusUpdater: statusUpdater,
}
for item := range col.Items(ctx, errs) {
itemCount++
_, rok := test.removed[item.ID()]
if rok {
assert.True(t, item.Deleted(), "removals should be marked as deleted")
dimt, ok := item.(data.ItemModTime)
require.True(t, ok, "item implements data.ItemModTime")
assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()")
}
modTime, aok := test.added[item.ID()]
if !rok && aok {
// Item's mod time should be what's passed into the collection
// initializer.
assert.Implements(t, (*data.ItemModTime)(nil), item)
assert.Equal(t, modTime, item.(data.ItemModTime).ModTime(), "item mod time")
assert.False(t, item.Deleted(), "additions should not be marked as deleted")
// Check if the test wants us to read the item's data so the lazy
// data fetch is executed.
if slices.Contains(test.expectReads, item.ID()) {
r := item.ToReader()
_, err := io.ReadAll(r)
assert.NoError(t, err, clues.ToCore(err))
r.Close()
assert.Implements(t, (*data.ItemInfo)(nil), item)
info, err := item.(data.ItemInfo).Info()
// ItemInfo's mod time should match what was passed into the
// collection initializer.
assert.NoError(t, err, clues.ToCore(err))
assert.Equal(t, modTime, info.Modified(), "ItemInfo mod time")
}
}
assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID())
}
assert.NoError(t, errs.Failure())
assert.Equal(
t,
test.expectItemCount,
itemCount,
"should see all expected items")
})
}
}
func (suite *CollectionUnitSuite) TestLazyItem_GetDataErrors() {
var (
parentPath = "thread/private/silly cats"
now = time.Now()
)
table := []struct {
name string
getErr error
expectReadErrType error
}{
{
name: "ReturnsErrorOnGenericGetError",
getErr: assert.AnError,
expectReadErrType: assert.AnError,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
m := getAndAugmentConversation{
GetItemErr: test.getErr,
}
li := data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter[models.Conversationable, models.Postable]{
resourceID: "resourceID",
itemID: "itemID",
getAndAugment: &m,
modTime: now,
parentPath: parentPath,
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(t, now, li.ModTime(), "item mod time")
_, err := readers.NewVersionedRestoreReader(li.ToReader())
assert.ErrorIs(t, err, test.expectReadErrType)
// Should get some form of error when trying to get info.
_, err = li.Info()
assert.Error(t, err, "Info()")
})
}
}
func (suite *CollectionUnitSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlight() {
var (
t = suite.T()
parentPath = "thread/private/silly cats"
now = time.Now()
)
ctx, flush := tester.NewContext(t)
defer flush()
m := getAndAugmentConversation{
GetItemErr: graph.ErrDeletedInFlight,
}
li := data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter[models.Conversationable, models.Postable]{
resourceID: "resourceID",
itemID: "itemID",
getAndAugment: &m,
modTime: now,
parentPath: parentPath,
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(
t,
now,
li.ModTime(),
"item mod time")
r, err := readers.NewVersionedRestoreReader(li.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.True(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
assert.NoError(t, err, "reading item data: %v", clues.ToCore(err))
assert.Empty(t, readData, "read item data")
_, err = li.Info()
assert.ErrorIs(t, err, data.ErrNotFound, "Info() error")
}
func (suite *CollectionUnitSuite) TestLazyItem() {
var (
t = suite.T()
parentPath = "thread/private/silly cats"
now = time.Now()
)
ctx, flush := tester.NewContext(t)
defer flush()
m := getAndAugmentConversation{}
li := data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter[models.Conversationable, models.Postable]{
resourceID: "resourceID",
itemID: "itemID",
getAndAugment: &m,
modTime: now,
parentPath: parentPath,
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(
t,
now,
li.ModTime(),
"item mod time")
r, err := readers.NewVersionedRestoreReader(li.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
assert.NoError(t, err, "reading item data: %v", clues.ToCore(err))
assert.NotEmpty(t, readData, "read item data")
info, err := li.Info()
assert.NoError(t, err, "getting item info: %v", clues.ToCore(err))
assert.Equal(t, parentPath, info.Groups.ParentPath)
assert.Equal(t, now, info.Modified())
}