diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index b8ee2c8b2..e24b46859 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -18,6 +18,7 @@ import ( "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) @@ -55,6 +56,10 @@ func updateStatus( statusUpdater(status) } +// ----------------------------------------------------------------------------- +// prefetchCollection +// ----------------------------------------------------------------------------- + type prefetchCollection[C graph.GetIDer, I groupsItemer] struct { data.BaseCollection protectedResource string @@ -72,7 +77,6 @@ type prefetchCollection[C graph.GetIDer, I groupsItemer] struct { statusUpdater support.StatusUpdater } -// NewExchangeDataCollection creates an ExchangeDataCollection. // State of the collection is set as an observation of the current // and previous paths. If the curr path is nil, the state is assumed // to be deleted. If the prev path is nil, it is assumed newly created. @@ -101,17 +105,11 @@ func NewCollection[C graph.GetIDer, I groupsItemer]( return collection } -// Items utility function to asynchronously execute process to fill data channel with -// M365 exchange objects and returns the data channel func (col *prefetchCollection[C, I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { go col.streamItems(ctx, errs) return col.stream } -// --------------------------------------------------------------------------- -// items() production -// --------------------------------------------------------------------------- - func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) { var ( streamedItems int64 @@ -248,3 +246,203 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul wg.Wait() } + +// ----------------------------------------------------------------------------- +// lazyFetchCollection +// ----------------------------------------------------------------------------- + +type lazyFetchCollection[C graph.GetIDer, I groupsItemer] struct { + data.BaseCollection + protectedResource string + stream chan data.Item + + contains container[C] + + // added is a list of existing item IDs that were added to a container + added map[string]time.Time + // removed is a list of item IDs that were deleted from, or moved out, of a container + removed map[string]struct{} + + getAndAugment getItemAndAugmentInfoer[C, I] + + statusUpdater support.StatusUpdater +} + +func (col *lazyFetchCollection[C, I]) Items( + ctx context.Context, + errs *fault.Bus, +) <-chan data.Item { + go col.streamItems(ctx, errs) + return col.stream +} + +func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) { + var ( + streamedItems int64 + wg sync.WaitGroup + colProgress chan<- struct{} + el = errs.Local() + ) + + ctx = clues.Add(ctx, "category", col.Category().String()) + + defer func() { + close(col.stream) + logger.Ctx(ctx).Infow( + "finished stream backup collection items", + "stats", col.Counter.Values()) + + updateStatus( + ctx, + col.statusUpdater, + len(col.added)+len(col.removed), + streamedItems, + 0, + col.FullPath().Folder(false), + errs.Failure()) + }() + + if len(col.added)+len(col.removed) > 0 { + colProgress = observe.CollectionProgress( + ctx, + col.Category().HumanString(), + col.LocationPath().Elements()) + defer close(colProgress) + } + + semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch) + defer close(semaphoreCh) + + // delete all removed items + for id := range col.removed { + semaphoreCh <- struct{}{} + + wg.Add(1) + + go func(id string) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + col.stream <- data.NewDeletedItem(id) + + atomic.AddInt64(&streamedItems, 1) + col.Counter.Inc(count.StreamItemsRemoved) + + if colProgress != nil { + colProgress <- struct{}{} + } + }(id) + } + + // add any new items + for id, modTime := range col.added { + if el.Failure() != nil { + break + } + + wg.Add(1) + semaphoreCh <- struct{}{} + + go func(id string, modTime time.Time) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + ictx := clues.Add( + ctx, + "item_id", id, + "parent_path", path.LoggableDir(col.LocationPath().String())) + + col.stream <- data.NewLazyItemWithInfo( + ictx, + &lazyItemGetter[C, I]{ + modTime: modTime, + getAndAugment: col.getAndAugment, + resourceID: col.protectedResource, + itemID: id, + containerIDs: col.FullPath().Folders(), + contains: col.contains, + parentPath: col.LocationPath().String(), + }, + id, + modTime, + col.Counter, + el) + + atomic.AddInt64(&streamedItems, 1) + + if colProgress != nil { + colProgress <- struct{}{} + } + }(id, modTime) + } + + wg.Wait() +} + +type lazyItemGetter[C graph.GetIDer, I groupsItemer] struct { + getAndAugment getItemAndAugmentInfoer[C, I] + resourceID string + itemID string + parentPath string + containerIDs path.Elements + modTime time.Time + contains container[C] +} + +func (lig *lazyItemGetter[C, I]) GetData( + ctx context.Context, + errs *fault.Bus, +) (io.ReadCloser, *details.ItemInfo, bool, error) { + writer := kjson.NewJsonSerializationWriter() + defer writer.Close() + + item, info, err := lig.getAndAugment.getItem( + ctx, + lig.resourceID, + lig.containerIDs, + lig.itemID) + if err != nil { + // If an item was deleted then return an empty file so we don't fail + // the backup. Also return delInFlight as true so that kopia skips + // adding ItemInfo to details. + // + // The item will be deleted from kopia on the next backup when the + // delta token shows it's removed. + if graph.IsErrDeletedInFlight(err) { + logger.CtxErr(ctx, err).Info("item not found") + return nil, nil, true, nil + } + + err = clues.WrapWC(ctx, err, "getting item data").Label(fault.LabelForceNoBackupCreation) + errs.AddRecoverable(ctx, err) + + return nil, nil, false, err + } + + lig.getAndAugment.augmentItemInfo(info, lig.contains.container) + + if err := writer.WriteObjectValue("", item); err != nil { + err = clues.WrapWC(ctx, err, "writing item to serializer").Label(fault.LabelForceNoBackupCreation) + errs.AddRecoverable(ctx, err) + + return nil, nil, false, err + } + + itemData, err := writer.GetSerializedContent() + if err != nil { + err = clues.WrapWC(ctx, err, "serializing item").Label(fault.LabelForceNoBackupCreation) + errs.AddRecoverable(ctx, err) + + return nil, nil, false, err + } + + info.ParentPath = lig.parentPath + // Update the mod time to what we already told kopia about. This is required + // for proper details merging. + info.Modified = lig.modTime + + return io.NopCloser(bytes.NewReader(itemData)), + &details.ItemInfo{Groups: info}, + false, + nil +} diff --git a/src/internal/m365/collection/groups/collection_test.go b/src/internal/m365/collection/groups/collection_test.go index 0bd298f10..7a7bda7f6 100644 --- a/src/internal/m365/collection/groups/collection_test.go +++ b/src/internal/m365/collection/groups/collection_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "slices" "testing" "time" @@ -23,6 +24,7 @@ import ( "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) type CollectionUnitSuite struct { @@ -268,3 +270,350 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() { }) } } + +type getAndAugmentConversation struct { + GetItemErr error + CallIDs []string +} + +//lint:ignore U1000 false linter issue due to generics +func (m *getAndAugmentConversation) getItem( + _ context.Context, + _ string, + _ path.Elements, + postID string, +) (models.Postable, *details.GroupsInfo, error) { + m.CallIDs = append(m.CallIDs, postID) + + p := models.NewPost() + p.SetId(ptr.To(postID)) + + return p, &details.GroupsInfo{}, m.GetItemErr +} + +// +//lint:ignore U1000 false linter issue due to generics +func (m *getAndAugmentConversation) augmentItemInfo(*details.GroupsInfo, models.Conversationable) { + // no-op +} + +func (m *getAndAugmentConversation) check(t *testing.T, expected []string) { + // Sort before comparing. We could use a set, but that would prevent us from + // detecting duplicates. + slices.Sort(m.CallIDs) + slices.Sort(expected) + + assert.Equal(t, expected, m.CallIDs, "expected calls") +} + +func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() { + var ( + t = suite.T() + start = time.Now().Add(-time.Second) + statusUpdater = func(*support.ControllerOperationStatus) {} + ) + + fullPath, err := path.Build( + "t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "fnords", "smarf") + require.NoError(t, err, clues.ToCore(err)) + + locPath, err := path.Build( + "t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "fnords", "smarf") + require.NoError(t, err, clues.ToCore(err)) + + table := []struct { + name string + added map[string]time.Time + removed map[string]struct{} + expectItemCount int + // Items we want to trigger lazy reader on. + expectReads []string + }{ + { + name: "no items", + }, + { + name: "only added items", + added: map[string]time.Time{ + "fisher": start.Add(time.Minute), + "flannigan": start.Add(2 * time.Minute), + "fitzbog": start.Add(3 * time.Minute), + }, + expectItemCount: 3, + expectReads: []string{ + "fisher", + "flannigan", + "fitzbog", + }, + }, + { + name: "only removed items", + removed: map[string]struct{}{ + "princess": {}, + "poppy": {}, + "petunia": {}, + }, + expectItemCount: 3, + }, + { + // TODO(pandeyabs): Overlaps between added and removed are deleted + // by NewCollection caller code. This is a slight deviation from how + // exchange does it. It's harmless but should be fixed for consistency. + // + // Since we are calling NewCollection here directly, we are not testing + // with overlaps, else those tests with fail. Same behavior exists for + // prefetch collections. + name: "added and removed items", + added: map[string]time.Time{ + "goblin": {}, + }, + removed: map[string]struct{}{ + "general": {}, + "goose": {}, + "grumbles": {}, + }, + expectItemCount: 4, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + var ( + t = suite.T() + errs = fault.New(true) + itemCount int + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + getterAugmenter := &getAndAugmentConversation{} + defer getterAugmenter.check(t, test.expectReads) + + col := &lazyFetchCollection[models.Conversationable, models.Postable]{ + BaseCollection: data.NewBaseCollection( + fullPath, + nil, + locPath.ToBuilder(), + control.DefaultOptions(), + false, + count.New()), + added: test.added, + contains: container[models.Conversationable]{}, + removed: test.removed, + getAndAugment: getterAugmenter, + stream: make(chan data.Item), + statusUpdater: statusUpdater, + } + + for item := range col.Items(ctx, errs) { + itemCount++ + + _, rok := test.removed[item.ID()] + if rok { + assert.True(t, item.Deleted(), "removals should be marked as deleted") + dimt, ok := item.(data.ItemModTime) + require.True(t, ok, "item implements data.ItemModTime") + assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()") + } + + modTime, aok := test.added[item.ID()] + if !rok && aok { + // Item's mod time should be what's passed into the collection + // initializer. + assert.Implements(t, (*data.ItemModTime)(nil), item) + assert.Equal(t, modTime, item.(data.ItemModTime).ModTime(), "item mod time") + + assert.False(t, item.Deleted(), "additions should not be marked as deleted") + + // Check if the test wants us to read the item's data so the lazy + // data fetch is executed. + if slices.Contains(test.expectReads, item.ID()) { + r := item.ToReader() + + _, err := io.ReadAll(r) + assert.NoError(t, err, clues.ToCore(err)) + + r.Close() + + assert.Implements(t, (*data.ItemInfo)(nil), item) + info, err := item.(data.ItemInfo).Info() + + // ItemInfo's mod time should match what was passed into the + // collection initializer. + assert.NoError(t, err, clues.ToCore(err)) + assert.Equal(t, modTime, info.Modified(), "ItemInfo mod time") + } + } + + assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID()) + } + + assert.NoError(t, errs.Failure()) + assert.Equal( + t, + test.expectItemCount, + itemCount, + "should see all expected items") + }) + } +} + +func (suite *CollectionUnitSuite) TestLazyItem_GetDataErrors() { + var ( + parentPath = "thread/private/silly cats" + now = time.Now() + ) + + table := []struct { + name string + getErr error + expectReadErrType error + }{ + { + name: "ReturnsErrorOnGenericGetError", + getErr: assert.AnError, + expectReadErrType: assert.AnError, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + m := getAndAugmentConversation{ + GetItemErr: test.getErr, + } + + li := data.NewLazyItemWithInfo( + ctx, + &lazyItemGetter[models.Conversationable, models.Postable]{ + resourceID: "resourceID", + itemID: "itemID", + getAndAugment: &m, + modTime: now, + parentPath: parentPath, + }, + "itemID", + now, + count.New(), + fault.New(true)) + + assert.False(t, li.Deleted(), "item shouldn't be marked deleted") + assert.Equal(t, now, li.ModTime(), "item mod time") + + _, err := readers.NewVersionedRestoreReader(li.ToReader()) + assert.ErrorIs(t, err, test.expectReadErrType) + + // Should get some form of error when trying to get info. + _, err = li.Info() + assert.Error(t, err, "Info()") + }) + } +} + +func (suite *CollectionUnitSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlight() { + var ( + t = suite.T() + + parentPath = "thread/private/silly cats" + now = time.Now() + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + m := getAndAugmentConversation{ + GetItemErr: graph.ErrDeletedInFlight, + } + + li := data.NewLazyItemWithInfo( + ctx, + &lazyItemGetter[models.Conversationable, models.Postable]{ + resourceID: "resourceID", + itemID: "itemID", + getAndAugment: &m, + modTime: now, + parentPath: parentPath, + }, + "itemID", + now, + count.New(), + fault.New(true)) + + assert.False(t, li.Deleted(), "item shouldn't be marked deleted") + assert.Equal( + t, + now, + li.ModTime(), + "item mod time") + + r, err := readers.NewVersionedRestoreReader(li.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.True(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) + + assert.Empty(t, readData, "read item data") + + _, err = li.Info() + assert.ErrorIs(t, err, data.ErrNotFound, "Info() error") +} + +func (suite *CollectionUnitSuite) TestLazyItem() { + var ( + t = suite.T() + + parentPath = "thread/private/silly cats" + now = time.Now() + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + m := getAndAugmentConversation{} + + li := data.NewLazyItemWithInfo( + ctx, + &lazyItemGetter[models.Conversationable, models.Postable]{ + resourceID: "resourceID", + itemID: "itemID", + getAndAugment: &m, + modTime: now, + parentPath: parentPath, + }, + "itemID", + now, + count.New(), + fault.New(true)) + + assert.False(t, li.Deleted(), "item shouldn't be marked deleted") + assert.Equal( + t, + now, + li.ModTime(), + "item mod time") + + r, err := readers.NewVersionedRestoreReader(li.ToReader()) + require.NoError(t, err, clues.ToCore(err)) + + assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version) + assert.False(t, r.Format().DelInFlight) + + readData, err := io.ReadAll(r) + assert.NoError(t, err, "reading item data: %v", clues.ToCore(err)) + + assert.NotEmpty(t, readData, "read item data") + + info, err := li.Info() + assert.NoError(t, err, "getting item info: %v", clues.ToCore(err)) + + assert.Equal(t, parentPath, info.Groups.ParentPath) + assert.Equal(t, now, info.Modified()) +}