fix item filtering, remove prefetch coll

This commit is contained in:
ryanfkeepers 2024-01-24 12:47:38 -07:00
parent 7e3d5d2074
commit 94b02ed6f3
4 changed files with 15 additions and 239 deletions

View File

@ -131,9 +131,10 @@ func populateCollection[I chatsItemer](
for _, item := range items { for _, item := range items {
if !bh.includeItem(item, scope) { if !bh.includeItem(item, scope) {
cl.Inc(count.SkippedItems) cl.Inc(count.SkippedItems)
} else { continue
includedItems = append(includedItems, item)
} }
includedItems = append(includedItems, item)
} }
cl.Add(count.ItemsAdded, int64(len(includedItems))) cl.Add(count.ItemsAdded, int64(len(includedItems)))
@ -159,8 +160,7 @@ func populateCollection[I chatsItemer](
qp.ProtectedResource.ID(), qp.ProtectedResource.ID(),
includedItems, includedItems,
container, container,
statusUpdater, statusUpdater)
useLazyReader)
return collection, clues.Stack(errs.Failure()).OrNil() return collection, clues.Stack(errs.Failure()).OrNil()
} }

View File

@ -58,6 +58,13 @@ func (bh usersChatsBackupHandler) includeItem(
ch models.Chatable, ch models.Chatable,
scope selectors.TeamsChatsScope, scope selectors.TeamsChatsScope,
) bool { ) bool {
// corner case: many Topics are empty, and empty inputs are automatically
// set to non-matching in the selectors code. This allows us to include
// everything without needing to check the topic value in that case.
if scope.IsAny(selectors.TeamsChatsChat) {
return true
}
return scope.Matches(selectors.TeamsChatsChat, ptr.Val(ch.GetTopic())) return scope.Matches(selectors.TeamsChatsChat, ptr.Val(ch.GetTopic()))
} }

View File

@ -18,7 +18,6 @@ import (
"github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/errs/core" "github.com/alcionai/corso/src/pkg/errs/core"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
@ -26,10 +25,7 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api/graph" "github.com/alcionai/corso/src/pkg/services/m365/api/graph"
) )
var ( var _ data.BackupCollection = &lazyFetchCollection[chatsItemer]{}
_ data.BackupCollection = &prefetchCollection[chatsItemer]{}
_ data.BackupCollection = &lazyFetchCollection[chatsItemer]{}
)
const ( const (
collectionChannelBufferSize = 1000 collectionChannelBufferSize = 1000
@ -63,24 +59,6 @@ func updateStatus(
statusUpdater(status) statusUpdater(status)
} }
// -----------------------------------------------------------------------------
// prefetchCollection
// -----------------------------------------------------------------------------
type prefetchCollection[I chatsItemer] struct {
data.BaseCollection
protectedResource string
stream chan data.Item
contains container[I]
items []I
getAndAugment getItemAndAugmentInfoer[I]
statusUpdater support.StatusUpdater
}
// State of the collection is set as an observation of the current // State of the collection is set as an observation of the current
// and previous paths. If the curr path is nil, the state is assumed // and previous paths. If the curr path is nil, the state is assumed
// to be deleted. If the prev path is nil, it is assumed newly created. // to be deleted. If the prev path is nil, it is assumed newly created.
@ -93,21 +71,8 @@ func NewCollection[I chatsItemer](
items []I, items []I,
contains container[I], contains container[I],
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
useLazyReader bool,
) data.BackupCollection { ) data.BackupCollection {
if useLazyReader { return &lazyFetchCollection[I]{
return &lazyFetchCollection[I]{
BaseCollection: baseCol,
items: items,
contains: contains,
getAndAugment: getAndAugment,
statusUpdater: statusUpdater,
stream: make(chan data.Item, collectionChannelBufferSize),
protectedResource: protectedResource,
}
}
return &prefetchCollection[I]{
BaseCollection: baseCol, BaseCollection: baseCol,
items: items, items: items,
contains: contains, contains: contains,
@ -118,128 +83,6 @@ func NewCollection[I chatsItemer](
} }
} }
func (col *prefetchCollection[I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
go col.streamItems(ctx, errs)
return col.stream
}
func (col *prefetchCollection[I]) streamItems(ctx context.Context, errs *fault.Bus) {
var (
streamedItems int64
totalBytes int64
wg sync.WaitGroup
progressMessage chan<- struct{}
el = errs.Local()
)
ctx = clues.Add(ctx, "category", col.Category().String())
defer func() {
close(col.stream)
logger.Ctx(ctx).Infow(
"finished stream backup collection items",
"stats", col.Counter.Values())
updateStatus(
ctx,
col.statusUpdater,
len(col.items),
streamedItems,
totalBytes,
col.FullPath().Folder(false),
errs.Failure())
}()
if len(col.items) > 0 {
progressMessage = observe.CollectionProgress(
ctx,
col.Category().HumanString(),
col.LocationPath().Elements())
defer close(progressMessage)
}
semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch)
defer close(semaphoreCh)
// add any new items
for _, item := range col.items {
if el.Failure() != nil {
break
}
wg.Add(1)
semaphoreCh <- struct{}{}
itemID := ptr.Val(item.GetId())
go func(id string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
item, info, err := col.getAndAugment.getItem(
ctx,
col.protectedResource,
id)
if err != nil {
err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
col.getAndAugment.augmentItemInfo(info, col.contains.container)
if err := writer.WriteObjectValue("", item); err != nil {
err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
itemData, err := writer.GetSerializedContent()
if err != nil {
err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
info.ParentPath = col.LocationPath().String()
storeItem, err := data.NewPrefetchedItemWithInfo(
io.NopCloser(bytes.NewReader(itemData)),
id,
details.ItemInfo{TeamsChats: info})
if err != nil {
err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
col.stream <- storeItem
atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, int64(info.Chat.MessageCount))
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
}
col.Counter.Add(count.StreamBytesAdded, int64(info.Chat.MessageCount))
if progressMessage != nil {
progressMessage <- struct{}{}
}
}(itemID)
}
wg.Wait()
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// lazyFetchCollection // lazyFetchCollection
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -133,14 +133,13 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
"g", "g",
nil, nil,
container[models.Chatable]{}, container[models.Chatable]{},
nil, nil)
false)
assert.Equal(t, test.expect, c.State(), "collection state") assert.Equal(t, test.expect, c.State(), "collection state")
assert.Equal(t, test.curr, c.FullPath(), "full path") assert.Equal(t, test.curr, c.FullPath(), "full path")
assert.Equal(t, test.prev, c.PreviousPath(), "prev path") assert.Equal(t, test.prev, c.PreviousPath(), "prev path")
prefetch, ok := c.(*prefetchCollection[models.Chatable]) prefetch, ok := c.(*lazyFetchCollection[models.Chatable])
require.True(t, ok, "collection type") require.True(t, ok, "collection type")
assert.Equal(t, test.loc, prefetch.LocationPath(), "location path") assert.Equal(t, test.loc, prefetch.LocationPath(), "location path")
@ -170,79 +169,6 @@ func (getAndAugmentChat) augmentItemInfo(*details.TeamsChatsInfo, models.Chatabl
// no-op // no-op
} }
func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
var (
t = suite.T()
statusUpdater = func(*support.ControllerOperationStatus) {}
)
fullPath, err := path.Build("t", "pr", path.TeamsChatsService, path.ChatsCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
locPath, err := path.Build("t", "pr", path.TeamsChatsService, path.ChatsCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
table := []struct {
name string
items []models.Chatable
}{
{
name: "no items",
},
{
name: "items",
items: testdata.StubChats("fisher", "flannigan", "fitzbog"),
},
}
for _, test := range table {
suite.Run(test.name, func() {
var (
t = suite.T()
errs = fault.New(true)
itemCount int
)
ctx, flush := tester.NewContext(t)
defer flush()
col := &prefetchCollection[models.Chatable]{
BaseCollection: data.NewBaseCollection(
fullPath,
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false,
count.New()),
items: test.items,
contains: container[models.Chatable]{},
getAndAugment: getAndAugmentChat{},
stream: make(chan data.Item),
statusUpdater: statusUpdater,
}
go col.streamItems(ctx, errs)
for item := range col.stream {
itemCount++
ok := slices.ContainsFunc(test.items, func(mc models.Chatable) bool {
return ptr.Val(mc.GetId()) == item.ID()
})
require.True(t, ok, "item must be either added or removed: %q", item.ID())
assert.False(t, item.Deleted(), "additions should not be marked as deleted")
}
assert.NoError(t, errs.Failure())
assert.Equal(
t,
len(test.items),
itemCount,
"should see all expected items")
})
}
}
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() { func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
var ( var (
t = suite.T() t = suite.T()