Integrate lazy item reader into conversations backup (#4921)

<!-- PR description-->




---

#### Does this PR need a docs update or release note?

- [x]  Yes, it's included

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature


#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* Closes https://github.com/alcionai/corso/issues/4862

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [x] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-12-26 23:09:12 -08:00 committed by GitHub
parent abc5ec1942
commit c6b9615c7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 125 additions and 26 deletions

View File

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
this case, Corso will skip over the item but report this in the backup summary. this case, Corso will skip over the item but report this in the backup summary.
- Guarantee Exchange email restoration when restoring multiple attachments. Some previous restores were failing with `ErrorItemNotFound`. - Guarantee Exchange email restoration when restoring multiple attachments. Some previous restores were failing with `ErrorItemNotFound`.
- Avoid Graph SDK `Requests must contain extension changes exclusively.` errors by removing server-populated field from restored event items. - Avoid Graph SDK `Requests must contain extension changes exclusively.` errors by removing server-populated field from restored event items.
- Improve Group mailbox(conversations) backup performance by only downloading new items or items with modified content.
- Handle cases where Exchange backup stored invalid JSON blobs if there were special characters in the user content. These would result in errors during restore or restore errors. - Handle cases where Exchange backup stored invalid JSON blobs if there were special characters in the user content. These would result in errors during restore or restore errors.
### Known issues ### Known issues

View File

@ -22,13 +22,7 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api/graph" "github.com/alcionai/corso/src/pkg/services/m365/api/graph"
) )
// TODO: incremental support // TODO: incremental support for channels
// multiple lines in this file are commented out so that
// we can focus on v0 backups and re-integrate them later
// for v1 incrementals.
// since these lines represent otherwise standard boilerplate,
// it's simpler to comment them for tracking than to delete
// and re-discover them later.
func CreateCollections[C graph.GetIDer, I groupsItemer]( func CreateCollections[C graph.GetIDer, I groupsItemer](
ctx context.Context, ctx context.Context,
@ -37,6 +31,7 @@ func CreateCollections[C graph.GetIDer, I groupsItemer](
tenantID string, tenantID string,
scope selectors.GroupsScope, scope selectors.GroupsScope,
su support.StatusUpdater, su support.StatusUpdater,
useLazyReader bool,
counter *count.Bus, counter *count.Bus,
errs *fault.Bus, errs *fault.Bus,
) ([]data.BackupCollection, bool, error) { ) ([]data.BackupCollection, bool, error) {
@ -76,6 +71,7 @@ func CreateCollections[C graph.GetIDer, I groupsItemer](
containers, containers,
scope, scope,
cdps[scope.Category().PathType()], cdps[scope.Category().PathType()],
useLazyReader,
bpc.Options, bpc.Options,
counter, counter,
errs) errs)
@ -98,6 +94,7 @@ func populateCollections[C graph.GetIDer, I groupsItemer](
containers []container[C], containers []container[C],
scope selectors.GroupsScope, scope selectors.GroupsScope,
dps metadata.DeltaPaths, dps metadata.DeltaPaths,
useLazyReader bool,
ctrlOpts control.Options, ctrlOpts control.Options,
counter *count.Bus, counter *count.Bus,
errs *fault.Bus, errs *fault.Bus,
@ -212,9 +209,10 @@ func populateCollections[C graph.GetIDer, I groupsItemer](
addAndRem.Added, addAndRem.Added,
removed, removed,
c, c,
statusUpdater) statusUpdater,
useLazyReader)
collections[c.storageDirFolders.String()] = &edc collections[c.storageDirFolders.String()] = edc
// add the current path for the container ID to be used in the next backup // add the current path for the container ID to be used in the next backup
// as the "previous path", for reference in case of a rename or relocation. // as the "previous path", for reference in case of a rename or relocation.

View File

@ -275,6 +275,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() {
test.mock.containers(), test.mock.containers(),
selectors.NewGroupsBackup(nil).Channels(selectors.Any())[0], selectors.NewGroupsBackup(nil).Channels(selectors.Any())[0],
nil, nil,
false,
ctrlOpts, ctrlOpts,
count.New(), count.New(),
fault.New(true)) fault.New(true))
@ -435,6 +436,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() {
test.mock.containers(), test.mock.containers(),
allScope, allScope,
test.deltaPaths, test.deltaPaths,
false,
ctrlOpts, ctrlOpts,
count.New(), count.New(),
fault.New(true)) fault.New(true))
@ -559,6 +561,7 @@ func (suite *BackupIntgSuite) TestCreateCollections() {
suite.tenantID, suite.tenantID,
test.scope, test.scope,
func(status *support.ControllerOperationStatus) {}, func(status *support.ControllerOperationStatus) {},
false,
count.New(), count.New(),
fault.New(true)) fault.New(true))
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))

View File

@ -23,7 +23,10 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api/graph" "github.com/alcionai/corso/src/pkg/services/m365/api/graph"
) )
var _ data.BackupCollection = &prefetchCollection[graph.GetIDer, groupsItemer]{} var (
_ data.BackupCollection = &prefetchCollection[graph.GetIDer, groupsItemer]{}
_ data.BackupCollection = &lazyFetchCollection[graph.GetIDer, groupsItemer]{}
)
const ( const (
collectionChannelBufferSize = 1000 collectionChannelBufferSize = 1000
@ -91,8 +94,22 @@ func NewCollection[C graph.GetIDer, I groupsItemer](
removed map[string]struct{}, removed map[string]struct{},
contains container[C], contains container[C],
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
) prefetchCollection[C, I] { useLazyReader bool,
collection := prefetchCollection[C, I]{ ) data.BackupCollection {
if useLazyReader {
return &lazyFetchCollection[C, I]{
BaseCollection: baseCol,
added: added,
contains: contains,
getAndAugment: getAndAugment,
removed: removed,
statusUpdater: statusUpdater,
stream: make(chan data.Item, collectionChannelBufferSize),
protectedResource: protectedResource,
}
}
return &prefetchCollection[C, I]{
BaseCollection: baseCol, BaseCollection: baseCol,
added: added, added: added,
contains: contains, contains: contains,
@ -102,8 +119,6 @@ func NewCollection[C graph.GetIDer, I groupsItemer](
stream: make(chan data.Item, collectionChannelBufferSize), stream: make(chan data.Item, collectionChannelBufferSize),
protectedResource: protectedResource, protectedResource: protectedResource,
} }
return collection
} }
func (col *prefetchCollection[C, I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { func (col *prefetchCollection[C, I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {

View File

@ -133,11 +133,17 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
nil, nil,
nil, nil,
container[models.Channelable]{}, container[models.Channelable]{},
nil) nil,
false)
assert.Equal(t, test.expect, c.State(), "collection state") assert.Equal(t, test.expect, c.State(), "collection state")
assert.Equal(t, test.curr, c.FullPath(), "full path") assert.Equal(t, test.curr, c.FullPath(), "full path")
assert.Equal(t, test.prev, c.PreviousPath(), "prev path") assert.Equal(t, test.prev, c.PreviousPath(), "prev path")
assert.Equal(t, test.loc, c.LocationPath(), "location path")
prefetch, ok := c.(*prefetchCollection[models.Channelable, models.ChatMessageable])
require.True(t, ok, "collection type")
assert.Equal(t, test.loc, prefetch.LocationPath(), "location path")
}) })
} }
} }

View File

@ -281,6 +281,9 @@ func backupChannels(
bc.producerConfig.ProtectedResource.ID(), bc.producerConfig.ProtectedResource.ID(),
bc.apiCli.Channels()) bc.apiCli.Channels())
// Always disable lazy reader for channels until #4321 support is added
useLazyReader := false
colls, canUsePreviousBackup, err := groups.CreateCollections( colls, canUsePreviousBackup, err := groups.CreateCollections(
ctx, ctx,
bc.producerConfig, bc.producerConfig,
@ -288,6 +291,7 @@ func backupChannels(
bc.creds.AzureTenantID, bc.creds.AzureTenantID,
scope, scope,
bc.statusUpdater, bc.statusUpdater,
useLazyReader,
counter, counter,
errs) errs)
if err != nil { if err != nil {
@ -329,6 +333,8 @@ func backupConversations(
defer close(progressBar) defer close(progressBar)
useLazyReader := !bc.producerConfig.Options.ToggleFeatures.DisableLazyItemReader
colls, canUsePreviousBackup, err := groups.CreateCollections( colls, canUsePreviousBackup, err := groups.CreateCollections(
ctx, ctx,
bc.producerConfig, bc.producerConfig,
@ -336,6 +342,7 @@ func backupConversations(
bc.creds.AzureTenantID, bc.creds.AzureTenantID,
scope, scope,
bc.statusUpdater, bc.statusUpdater,
useLazyReader,
counter, counter,
errs) errs)
if err != nil { if err != nil {

View File

@ -4,9 +4,12 @@ import (
"context" "context"
"testing" "testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/events"
evmock "github.com/alcionai/corso/src/internal/events/mock" evmock "github.com/alcionai/corso/src/internal/events/mock"
"github.com/alcionai/corso/src/internal/m365/collection/drive" "github.com/alcionai/corso/src/internal/m365/collection/drive"
. "github.com/alcionai/corso/src/internal/operations/test/m365" . "github.com/alcionai/corso/src/internal/operations/test/m365"
@ -312,21 +315,27 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic() {
sel.Include( sel.Include(
selTD.GroupsBackupLibraryFolderScope(sel), selTD.GroupsBackupLibraryFolderScope(sel),
selTD.GroupsBackupChannelScope(sel), selTD.GroupsBackupChannelScope(sel),
sel.Conversation(selectors.Any())) selTD.GroupsBackupConversationScope(sel))
bo, bod := PrepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup, counter) bo, bod := PrepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup, counter)
defer bod.Close(t, ctx) defer bod.Close(t, ctx)
reasons, err := bod.Sel.Reasons(bod.Acct.ID(), false)
require.NoError(t, err, clues.ToCore(err))
RunAndCheckBackup(t, ctx, &bo, mb, false) RunAndCheckBackup(t, ctx, &bo, mb, false)
CheckBackupIsInManifests(
t, for _, reason := range reasons {
ctx, CheckBackupIsInManifests(
bod.KW, t,
bod.SW, ctx,
&bo, bod.KW,
bod.Sel, bod.SW,
bod.Sel.ID(), &bo,
path.ChannelMessagesCategory) bod.Sel,
bod.Sel.ID(),
reason.Category())
}
_, expectDeets := deeTD.GetDeetsInBackup( _, expectDeets := deeTD.GetDeetsInBackup(
t, t,
@ -347,6 +356,66 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic() {
bod.SSS, bod.SSS,
expectDeets, expectDeets,
false) false)
// Basic, happy path incremental test. No changes are dictated or expected.
// This only tests that an incremental backup is runnable at all, and that it
// produces fewer results than the last backup.
//
// Incremental testing for conversations is limited because of API restrictions.
// Since graph doesn't provide us a way to programmatically delete conversations,
// or create new conversations without a delegated token, we can't do incremental
// testing with newly added items.
incMB := evmock.NewBus()
incBO := NewTestBackupOp(
t,
ctx,
bod,
incMB,
opts,
count.New())
RunAndCheckBackup(t, ctx, &incBO, incMB, true)
for _, reason := range reasons {
CheckBackupIsInManifests(
t,
ctx,
bod.KW,
bod.SW,
&incBO,
bod.Sel,
bod.Sel.ID(),
reason.Category())
}
_, expectDeets = deeTD.GetDeetsInBackup(
t,
ctx,
incBO.Results.BackupID,
bod.Acct.ID(),
bod.Sel.ID(),
bod.Sel.PathService(),
whatSet,
bod.KMS,
bod.SSS)
deeTD.CheckBackupDetails(
t,
ctx,
incBO.Results.BackupID,
whatSet,
bod.KMS,
bod.SSS,
expectDeets,
false)
assert.NotZero(
t,
incBO.Results.Counts[string(count.PersistedCachedFiles)],
"cached items")
assert.Greater(t, bo.Results.ItemsWritten, incBO.Results.ItemsWritten, "incremental items written")
assert.Greater(t, bo.Results.BytesRead, incBO.Results.BytesRead, "incremental bytes read")
assert.Greater(t, bo.Results.BytesUploaded, incBO.Results.BytesUploaded, "incremental bytes uploaded")
assert.Equal(t, 1, incMB.TimesCalled[events.BackupEnd], "incremental backup-end events")
} }
func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsExtensions() { func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsExtensions() {