diff --git a/src/cli/backup/backup.go b/src/cli/backup/backup.go index 1cee170d8..432e5798e 100644 --- a/src/cli/backup/backup.go +++ b/src/cli/backup/backup.go @@ -196,7 +196,7 @@ var defaultSelectorConfig = selectors.Config{OnlyMatchItemNames: true} func runBackups( ctx context.Context, r repository.Repository, - serviceName, resourceOwnerType string, + serviceName string, selectorSet []selectors.Selector, ins idname.Cacher, ) error { diff --git a/src/cli/backup/exchange.go b/src/cli/backup/exchange.go index d69ee3adf..298569da6 100644 --- a/src/cli/backup/exchange.go +++ b/src/cli/backup/exchange.go @@ -191,7 +191,7 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error { return runBackups( ctx, r, - "Exchange", "user", + "Exchange", selectorSet, ins) } diff --git a/src/cli/backup/groups.go b/src/cli/backup/groups.go index f4cc101f0..199227d61 100644 --- a/src/cli/backup/groups.go +++ b/src/cli/backup/groups.go @@ -3,6 +3,7 @@ package backup import ( "context" "errors" + "fmt" "github.com/alcionai/clues" "github.com/spf13/cobra" @@ -74,7 +75,7 @@ func addGroupsCommands(cmd *cobra.Command) *cobra.Command { // Flags addition ordering should follow the order we want them to appear in help and docs: flags.AddGroupFlag(c) - flags.AddDataFlag(c, []string{dataLibraries}, false) + flags.AddDataFlag(c, []string{flags.DataLibraries, flags.DataMessages}, false) flags.AddCorsoPassphaseFlags(c) flags.AddAWSCredsFlags(c) flags.AddAzureCredsFlags(c) @@ -176,7 +177,7 @@ func createGroupsCmd(cmd *cobra.Command, args []string) error { return runBackups( ctx, r, - "Group", "group", + "Group", selectorSet, ins) } @@ -315,19 +316,21 @@ func validateGroupsBackupCreateFlags(groups, cats []string) error { ) } - // TODO(meain) - // for _, d := range cats { - // if d != dataLibraries { - // return clues.New( - // d + " is an unrecognized data type; only " + dataLibraries + " is supported" - // ) - // } - // } + msg := fmt.Sprintf( + " is an unrecognized data type; only %s and %s are supported", + flags.DataLibraries, flags.DataMessages) + + allowedCats := utils.GroupsAllowedCategories() + + for _, d := range cats { + if _, ok := allowedCats[d]; !ok { + return clues.New(d + msg) + } + } return nil } -// TODO: users might specify a data type, this only supports AllData(). func groupsBackupCreateSelectors( ctx context.Context, ins idname.Cacher, @@ -339,27 +342,9 @@ func groupsBackupCreateSelectors( sel := selectors.NewGroupsBackup(slices.Clone(group)) - return addGroupsCategories(sel, cats) + return utils.AddGroupsCategories(sel, cats) } func includeAllGroupWithCategories(ins idname.Cacher, categories []string) *selectors.GroupsBackup { - return addGroupsCategories(selectors.NewGroupsBackup(ins.IDs()), categories) -} - -func addGroupsCategories(sel *selectors.GroupsBackup, cats []string) *selectors.GroupsBackup { - if len(cats) == 0 { - sel.Include(sel.AllData()) - } - - // TODO(meain): handle filtering - // for _, d := range cats { - // switch d { - // case dataLibraries: - // sel.Include(sel.LibraryFolders(selectors.Any())) - // case dataPages: - // sel.Include(sel.Pages(selectors.Any())) - // } - // } - - return sel + return utils.AddGroupsCategories(selectors.NewGroupsBackup(ins.IDs()), categories) } diff --git a/src/cli/backup/groups_test.go b/src/cli/backup/groups_test.go index 04a131b59..7c4602834 100644 --- a/src/cli/backup/groups_test.go +++ b/src/cli/backup/groups_test.go @@ -3,6 +3,7 @@ package backup import ( "testing" + "github.com/alcionai/clues" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -96,3 +97,43 @@ func (suite *GroupsUnitSuite) TestAddGroupsCommands() { }) } } + +func (suite *GroupsUnitSuite) TestValidateGroupsBackupCreateFlags() { + table := []struct { + name string + cats []string + expect assert.ErrorAssertionFunc + }{ + { + name: "none", + cats: []string{}, + expect: assert.NoError, + }, + { + name: "libraries", + cats: []string{flags.DataLibraries}, + expect: assert.NoError, + }, + { + name: "messages", + cats: []string{flags.DataMessages}, + expect: assert.NoError, + }, + { + name: "all allowed", + cats: []string{flags.DataLibraries, flags.DataMessages}, + expect: assert.NoError, + }, + { + name: "bad inputs", + cats: []string{"foo"}, + expect: assert.Error, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + err := validateGroupsBackupCreateFlags([]string{"*"}, test.cats) + test.expect(suite.T(), err, clues.ToCore(err)) + }) + } +} diff --git a/src/cli/backup/onedrive.go b/src/cli/backup/onedrive.go index 4e241323d..87a8a2236 100644 --- a/src/cli/backup/onedrive.go +++ b/src/cli/backup/onedrive.go @@ -172,7 +172,7 @@ func createOneDriveCmd(cmd *cobra.Command, args []string) error { return runBackups( ctx, r, - "OneDrive", "user", + "OneDrive", selectorSet, ins) } diff --git a/src/cli/backup/sharepoint.go b/src/cli/backup/sharepoint.go index 567c0878c..036e7e5cd 100644 --- a/src/cli/backup/sharepoint.go +++ b/src/cli/backup/sharepoint.go @@ -28,11 +28,6 @@ import ( // setup and globals // ------------------------------------------------------------------------------------------------ -const ( - dataLibraries = "libraries" - dataPages = "pages" -) - const ( sharePointServiceCommand = "sharepoint" sharePointServiceCommandCreateUseSuffix = "--site | '" + flags.Wildcard + "'" @@ -90,7 +85,7 @@ func addSharePointCommands(cmd *cobra.Command) *cobra.Command { flags.AddCorsoPassphaseFlags(c) flags.AddAWSCredsFlags(c) flags.AddAzureCredsFlags(c) - flags.AddDataFlag(c, []string{dataLibraries}, true) + flags.AddDataFlag(c, []string{flags.DataLibraries}, true) flags.AddFailFastFlag(c) flags.AddDisableIncrementalsFlag(c) flags.AddForceItemDataDownloadFlag(c) @@ -193,7 +188,7 @@ func createSharePointCmd(cmd *cobra.Command, args []string) error { return runBackups( ctx, r, - "SharePoint", "site", + "SharePoint", selectorSet, ins) } @@ -208,9 +203,9 @@ func validateSharePointBackupCreateFlags(sites, weburls, cats []string) error { } for _, d := range cats { - if d != dataLibraries && d != dataPages { + if d != flags.DataLibraries && d != flags.DataPages { return clues.New( - d + " is an unrecognized data type; either " + dataLibraries + "or " + dataPages, + d + " is an unrecognized data type; either " + flags.DataLibraries + "or " + flags.DataPages, ) } } @@ -253,9 +248,9 @@ func addCategories(sel *selectors.SharePointBackup, cats []string) *selectors.Sh for _, d := range cats { switch d { - case dataLibraries: + case flags.DataLibraries: sel.Include(sel.LibraryFolders(selectors.Any())) - case dataPages: + case flags.DataPages: sel.Include(sel.Pages(selectors.Any())) } } diff --git a/src/cli/backup/sharepoint_test.go b/src/cli/backup/sharepoint_test.go index 8fadd064e..0a4956583 100644 --- a/src/cli/backup/sharepoint_test.go +++ b/src/cli/backup/sharepoint_test.go @@ -226,7 +226,7 @@ func (suite *SharePointUnitSuite) TestSharePointBackupCreateSelectors() { { name: "Pages", site: bothIDs, - data: []string{dataPages}, + data: []string{flags.DataPages}, expect: bothIDs, }, } diff --git a/src/cli/backup/teams.go b/src/cli/backup/teams.go index 97e314cfd..189a97deb 100644 --- a/src/cli/backup/teams.go +++ b/src/cli/backup/teams.go @@ -1,14 +1,24 @@ package backup import ( + "context" + "fmt" + "github.com/alcionai/clues" "github.com/spf13/cobra" "github.com/spf13/pflag" + "golang.org/x/exp/slices" "github.com/alcionai/corso/src/cli/flags" . "github.com/alcionai/corso/src/cli/print" + "github.com/alcionai/corso/src/cli/repo" "github.com/alcionai/corso/src/cli/utils" + "github.com/alcionai/corso/src/internal/common/idname" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" + "github.com/alcionai/corso/src/pkg/services/m365" ) // ------------------------------------------------------------------------------------------------ @@ -134,11 +144,38 @@ func createTeamsCmd(cmd *cobra.Command, args []string) error { return nil } - if err := validateTeamBackupCreateFlags(flags.TeamFV); err != nil { + if err := validateTeamsBackupCreateFlags(flags.TeamFV, flags.CategoryDataFV); err != nil { + return err + } + + r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, path.GroupsService, repo.S3Overrides(cmd)) + if err != nil { return Only(ctx, err) } - return Only(ctx, utils.ErrNotYetImplemented) + defer utils.CloseRepo(ctx, r) + + // TODO: log/print recoverable errors + errs := fault.New(false) + + ins, err := m365.GroupsMap(ctx, *acct, errs) + if err != nil { + return Only(ctx, clues.Wrap(err, "Failed to retrieve M365 teams")) + } + + sel := teamsBackupCreateSelectors(ctx, ins, flags.TeamFV, flags.CategoryDataFV) + selectorSet := []selectors.Selector{} + + for _, discSel := range sel.SplitByResourceOwner(ins.IDs()) { + selectorSet = append(selectorSet, discSel.Selector) + } + + return runBackups( + ctx, + r, + "Group", + selectorSet, + ins) } // ------------------------------------------------------------------------------------------------ @@ -208,7 +245,7 @@ func deleteTeamsCmd(cmd *cobra.Command, args []string) error { // helpers // --------------------------------------------------------------------------- -func validateTeamBackupCreateFlags(teams []string) error { +func validateTeamsBackupCreateFlags(teams, cats []string) error { if len(teams) == 0 { return clues.New( "requires one or more --" + @@ -217,14 +254,35 @@ func validateTeamBackupCreateFlags(teams []string) error { ) } - // TODO(meain) - // for _, d := range cats { - // if d != dataLibraries { - // return clues.New( - // d + " is an unrecognized data type; only " + dataLibraries + " is supported" - // ) - // } - // } + msg := fmt.Sprintf( + " is an unrecognized data type; only %s and %s are supported", + flags.DataLibraries, flags.DataMessages) + + allowedCats := utils.GroupsAllowedCategories() + + for _, d := range cats { + if _, ok := allowedCats[d]; !ok { + return clues.New(d + msg) + } + } return nil } + +func teamsBackupCreateSelectors( + ctx context.Context, + ins idname.Cacher, + team, cats []string, +) *selectors.GroupsBackup { + if filters.PathContains(team).Compare(flags.Wildcard) { + return includeAllTeamWithCategories(ins, cats) + } + + sel := selectors.NewGroupsBackup(slices.Clone(team)) + + return utils.AddGroupsCategories(sel, cats) +} + +func includeAllTeamWithCategories(ins idname.Cacher, categories []string) *selectors.GroupsBackup { + return utils.AddGroupsCategories(selectors.NewGroupsBackup(ins.IDs()), categories) +} diff --git a/src/cli/backup/teams_test.go b/src/cli/backup/teams_test.go index 966830f82..60b209db2 100644 --- a/src/cli/backup/teams_test.go +++ b/src/cli/backup/teams_test.go @@ -3,6 +3,7 @@ package backup import ( "testing" + "github.com/alcionai/clues" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -96,3 +97,43 @@ func (suite *TeamsUnitSuite) TestAddTeamsCommands() { }) } } + +func (suite *TeamsUnitSuite) TestValidateTeamsBackupCreateFlags() { + table := []struct { + name string + cats []string + expect assert.ErrorAssertionFunc + }{ + { + name: "none", + cats: []string{}, + expect: assert.NoError, + }, + { + name: "libraries", + cats: []string{flags.DataLibraries}, + expect: assert.NoError, + }, + { + name: "messages", + cats: []string{flags.DataMessages}, + expect: assert.NoError, + }, + { + name: "all allowed", + cats: []string{flags.DataLibraries, flags.DataMessages}, + expect: assert.NoError, + }, + { + name: "bad inputs", + cats: []string{"foo"}, + expect: assert.Error, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + err := validateTeamsBackupCreateFlags([]string{"*"}, test.cats) + test.expect(suite.T(), err, clues.ToCore(err)) + }) + } +} diff --git a/src/cli/flags/groups.go b/src/cli/flags/groups.go index 8aa6792ad..fc6c4803d 100644 --- a/src/cli/flags/groups.go +++ b/src/cli/flags/groups.go @@ -4,9 +4,9 @@ import ( "github.com/spf13/cobra" ) -const ( - GroupFN = "group" -) +const DataMessages = "messages" + +const GroupFN = "group" var GroupFV []string diff --git a/src/cli/flags/sharepoint.go b/src/cli/flags/sharepoint.go index 31ba29bff..d0cdbfe0b 100644 --- a/src/cli/flags/sharepoint.go +++ b/src/cli/flags/sharepoint.go @@ -4,6 +4,11 @@ import ( "github.com/spf13/cobra" ) +const ( + DataLibraries = "libraries" + DataPages = "pages" +) + const ( LibraryFN = "library" ListFolderFN = "list" diff --git a/src/cli/utils/groups.go b/src/cli/utils/groups.go index cabc9f3c6..2364c2d27 100644 --- a/src/cli/utils/groups.go +++ b/src/cli/utils/groups.go @@ -19,6 +19,30 @@ type GroupsOpts struct { Populated flags.PopulatedFlags } +func GroupsAllowedCategories() map[string]struct{} { + return map[string]struct{}{ + flags.DataLibraries: {}, + flags.DataMessages: {}, + } +} + +func AddGroupsCategories(sel *selectors.GroupsBackup, cats []string) *selectors.GroupsBackup { + if len(cats) == 0 { + sel.Include(sel.AllData()) + } + + for _, d := range cats { + switch d { + case flags.DataLibraries: + sel.Include(sel.LibraryFolders(selectors.Any())) + case flags.DataMessages: + sel.Include(sel.ChannelMessages(selectors.Any(), selectors.Any())) + } + } + + return sel +} + func MakeGroupsOpts(cmd *cobra.Command) GroupsOpts { return GroupsOpts{ Groups: flags.UserFV, diff --git a/src/cli/utils/groups_test.go b/src/cli/utils/groups_test.go index e2a48faf0..b90edad74 100644 --- a/src/cli/utils/groups_test.go +++ b/src/cli/utils/groups_test.go @@ -6,8 +6,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/cli/flags" "github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/selectors" ) type GroupsUtilsSuite struct { @@ -159,3 +161,44 @@ func (suite *GroupsUtilsSuite) TestValidateGroupsRestoreFlags() { }) } } + +func (suite *GroupsUtilsSuite) TestAddGroupsCategories() { + table := []struct { + name string + cats []string + expectScopeLen int + }{ + { + name: "none", + cats: []string{}, + expectScopeLen: 2, + }, + { + name: "libraries", + cats: []string{flags.DataLibraries}, + expectScopeLen: 1, + }, + { + name: "messages", + cats: []string{flags.DataMessages}, + expectScopeLen: 1, + }, + { + name: "all allowed", + cats: []string{flags.DataLibraries, flags.DataMessages}, + expectScopeLen: 2, + }, + { + name: "bad inputs", + cats: []string{"foo"}, + expectScopeLen: 0, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + sel := utils.AddGroupsCategories(selectors.NewGroupsBackup(selectors.Any()), test.cats) + scopes := sel.Scopes() + assert.Len(suite.T(), scopes, test.expectScopeLen) + }) + } +} diff --git a/src/internal/common/str/str.go b/src/internal/common/str/str.go index 2ab60ea2f..59f36933d 100644 --- a/src/internal/common/str/str.go +++ b/src/internal/common/str/str.go @@ -56,3 +56,27 @@ func First(vs ...string) string { return "" } + +// Preview reduces the string to the specified size. +// If the string is longer than the size, the last three +// characters are replaced with an ellipsis. Size < 4 +// will default to 4. +// ex: +// Preview("123", 6) => "123" +// Preview("1234567", 6) "123..." +func Preview(s string, size int) string { + if size < 4 { + size = 4 + } + + if len(s) < size { + return s + } + + ss := s[:size] + if len(s) > size { + ss = s[:size-3] + "..." + } + + return ss +} diff --git a/src/internal/common/str/str_test.go b/src/internal/common/str/str_test.go new file mode 100644 index 000000000..46a841966 --- /dev/null +++ b/src/internal/common/str/str_test.go @@ -0,0 +1,53 @@ +package str + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// --------------------------------------------------------------------------- +// Warning: importing the corso tester.suite causes a circular import +// --------------------------------------------------------------------------- + +func TestPreview(t *testing.T) { + table := []struct { + input string + size int + expect string + }{ + { + input: "", + size: 1, + expect: "", + }, + { + input: "yes", + size: 1, + expect: "yes", + }, + { + input: "yes!", + size: 5, + expect: "yes!", + }, + { + input: "however", + size: 6, + expect: "how...", + }, + { + input: "negative", + size: -1, + expect: "n...", + }, + } + for _, test := range table { + t.Run(test.input, func(t *testing.T) { + assert.Equal( + t, + test.expect, + Preview(test.input, test.size)) + }) + } +} diff --git a/src/internal/m365/collection/drive/handler_utils.go b/src/internal/m365/collection/drive/handler_utils.go index 6dc4be66e..9d0f973ad 100644 --- a/src/internal/m365/collection/drive/handler_utils.go +++ b/src/internal/m365/collection/drive/handler_utils.go @@ -90,7 +90,6 @@ func augmentItemInfo( } case path.GroupsService: - // TODO: Add channel name and ID dii.Groups = &details.GroupsInfo{ Created: ptr.Val(item.GetCreatedDateTime()), DriveID: driveID, diff --git a/src/internal/m365/collection/drive/restore.go b/src/internal/m365/collection/drive/restore.go index 855ab0580..544808d2e 100644 --- a/src/internal/m365/collection/drive/restore.go +++ b/src/internal/m365/collection/drive/restore.go @@ -521,8 +521,6 @@ func restoreV6File( return itemInfo, nil } - fmt.Printf("\n-----\nrestorev6 %+v\n-----\n", rcc.RestoreConfig.IncludePermissions) - err = RestorePermissions( ctx, rh, @@ -572,8 +570,6 @@ func CreateRestoreFolders( return id, nil } - fmt.Printf("\n-----\ncreatefolders %+v\n-----\n", restorePerms) - err = RestorePermissions( ctx, rh, diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index ba421763c..3137d5b07 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -39,15 +39,15 @@ const ( // Collection implements the interface from data.Collection // Structure holds data for an Exchange application for a single user type Collection struct { - user string - data chan data.Item + user string + stream chan data.Item // added is a list of existing item IDs that were added to a container added map[string]struct{} // removed is a list of item IDs that were deleted from, or moved out, of a container removed map[string]struct{} - items itemGetterSerializer + getter itemGetterSerializer category path.CategoryType statusUpdater support.StatusUpdater @@ -91,10 +91,10 @@ func NewCollection( added: make(map[string]struct{}, 0), category: category, ctrl: ctrlOpts, - data: make(chan data.Item, collectionChannelBufferSize), + stream: make(chan data.Item, collectionChannelBufferSize), doNotMergeItems: doNotMergeItems, fullPath: curr, - items: items, + getter: items, locationPath: location, prevPath: prev, removed: make(map[string]struct{}, 0), @@ -110,7 +110,7 @@ func NewCollection( // M365 exchange objects and returns the data channel func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { go col.streamItems(ctx, errs) - return col.data + return col.stream } // FullPath returns the Collection's fullPath []string @@ -164,7 +164,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { if len(col.added)+len(col.removed) > 0 { colProgress = observe.CollectionProgress( ctx, - col.fullPath.Category().String(), + col.FullPath().Category().String(), col.LocationPath().Elements()) defer close(colProgress) } @@ -182,7 +182,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { defer wg.Done() defer func() { <-semaphoreCh }() - col.data <- &Item{ + col.stream <- &Item{ id: id, modTime: time.Now().UTC(), // removed items have no modTime entry. deleted: true, @@ -211,7 +211,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { defer wg.Done() defer func() { <-semaphoreCh }() - item, info, err := col.items.GetItem( + item, info, err := col.getter.GetItem( ctx, user, id, @@ -232,7 +232,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { return } - data, err := col.items.Serialize(ctx, item, user, id) + data, err := col.getter.Serialize(ctx, item, user, id) if err != nil { errs.AddRecoverable(ctx, clues.Wrap(err, "serializing item").Label(fault.LabelForceNoBackupCreation)) return @@ -244,9 +244,9 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { info.Size = int64(len(data)) } - info.ParentPath = col.locationPath.String() + info.ParentPath = col.LocationPath().String() - col.data <- &Item{ + col.stream <- &Item{ id: id, message: data, info: info, @@ -265,7 +265,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { wg.Wait() } -// terminatePopulateSequence is a utility function used to close a Collection's data channel +// finishPopulation is a utility function used to close a Collection's data channel // and to send the status update through the channel. func (col *Collection) finishPopulation( ctx context.Context, @@ -273,10 +273,11 @@ func (col *Collection) finishPopulation( totalBytes int64, err error, ) { - close(col.data) + close(col.stream) attempted := len(col.added) + len(col.removed) - status := support.CreateStatus(ctx, + status := support.CreateStatus( + ctx, support.Backup, 1, support.CollectionMetrics{ @@ -284,7 +285,7 @@ func (col *Collection) finishPopulation( Successes: success, Bytes: totalBytes, }, - col.fullPath.Folder(false)) + col.FullPath().Folder(false)) logger.Ctx(ctx).Debugw("done streaming items", "status", status.String()) diff --git a/src/internal/m365/collection/exchange/collection_test.go b/src/internal/m365/collection/exchange/collection_test.go index f52f95958..03becd45f 100644 --- a/src/internal/m365/collection/exchange/collection_test.go +++ b/src/internal/m365/collection/exchange/collection_test.go @@ -81,29 +81,6 @@ func (suite *CollectionSuite) TestReader_Empty() { assert.NoError(t, err, clues.ToCore(err)) } -func (suite *CollectionSuite) TestColleciton_FullPath() { - t := suite.T() - tenant := "a-tenant" - user := "a-user" - folder := "a-folder" - - fullPath, err := path.Build( - tenant, - user, - path.ExchangeService, - path.EmailCategory, - false, - folder) - require.NoError(t, err, clues.ToCore(err)) - - edc := Collection{ - user: user, - fullPath: fullPath, - } - - assert.Equal(t, fullPath, edc.FullPath()) -} - func (suite *CollectionSuite) TestCollection_NewCollection() { t := suite.T() tenant := "a-tenant" diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go index 550eb649e..8d5286106 100644 --- a/src/internal/m365/collection/groups/backup.go +++ b/src/internal/m365/collection/groups/backup.go @@ -5,6 +5,7 @@ import ( "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" + "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" @@ -102,7 +103,7 @@ func populateCollections( // the process have been deleted. // tombstones = makeTombstones(dps) - logger.Ctx(ctx).Infow("filling collections") + logger.Ctx(ctx).Info("filling collections") // , "len_deltapaths", len(dps)) el := errs.Local() @@ -147,7 +148,7 @@ func populateCollections( // ictx = clues.Add(ictx, "previous_path", prevPath) - items, _, err := bh.getChannelMessagesDelta(ctx, cID, "") + items, _, err := bh.getChannelMessageIDsDelta(ctx, cID, "") if err != nil { el.AddRecoverable(ctx, clues.Stack(err)) continue @@ -168,6 +169,7 @@ func populateCollections( } edc := NewCollection( + bh, qp.ProtectedResource.ID(), currPath, prevPath, @@ -192,9 +194,7 @@ func populateCollections( // currPaths[cID] = currPath.String() // FIXME: normally this goes before removal, but the linters require no bottom comments - for _, item := range items { - edc.added[ptr.Val(item.GetId())] = struct{}{} - } + maps.Copy(edc.added, items) } // TODO: handle tombstones here diff --git a/src/internal/m365/collection/groups/backup_test.go b/src/internal/m365/collection/groups/backup_test.go index d604e65a2..68efddb03 100644 --- a/src/internal/m365/collection/groups/backup_test.go +++ b/src/internal/m365/collection/groups/backup_test.go @@ -21,6 +21,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -36,22 +37,25 @@ import ( var _ backupHandler = &mockBackupHandler{} type mockBackupHandler struct { - channels []models.Channelable - channelsErr error - messages []models.ChatMessageable - messagesErr error - doNotInclude bool + channels []models.Channelable + channelsErr error + messageIDs map[string]struct{} + messagesErr error + messages map[string]models.ChatMessageable + info map[string]*details.GroupsInfo + getMessageErr map[string]error + doNotInclude bool } func (bh mockBackupHandler) getChannels(context.Context) ([]models.Channelable, error) { return bh.channels, bh.channelsErr } -func (bh mockBackupHandler) getChannelMessagesDelta( +func (bh mockBackupHandler) getChannelMessageIDsDelta( _ context.Context, _, _ string, -) ([]models.ChatMessageable, api.DeltaUpdate, error) { - return bh.messages, api.DeltaUpdate{}, bh.messagesErr +) (map[string]struct{}, api.DeltaUpdate, error) { + return bh.messageIDs, api.DeltaUpdate{}, bh.messagesErr } func (bh mockBackupHandler) includeContainer( @@ -76,6 +80,13 @@ func (bh mockBackupHandler) canonicalPath( false) } +func (bh mockBackupHandler) getChannelMessage( + _ context.Context, + _, _, itemID string, +) (models.ChatMessageable, *details.GroupsInfo, error) { + return bh.messages[itemID], bh.info[itemID], bh.getMessageErr[itemID] +} + // --------------------------------------------------------------------------- // Unit Suite // --------------------------------------------------------------------------- @@ -121,8 +132,8 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { { name: "happy path, one container", mock: mockBackupHandler{ - channels: testdata.StubChannels("one"), - messages: testdata.StubChatMessages("msg-one"), + channels: testdata.StubChannels("one"), + messageIDs: map[string]struct{}{"msg-one": {}}, }, scope: allScope, expectErr: require.NoError, @@ -134,8 +145,8 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { { name: "happy path, many containers", mock: mockBackupHandler{ - channels: testdata.StubChannels("one", "two"), - messages: testdata.StubChatMessages("msg-one"), + channels: testdata.StubChannels("one", "two"), + messageIDs: map[string]struct{}{"msg-one": {}}, }, scope: allScope, expectErr: require.NoError, diff --git a/src/internal/m365/collection/groups/channel_handler.go b/src/internal/m365/collection/groups/channel_handler.go index 43c82d15b..1989426be 100644 --- a/src/internal/m365/collection/groups/channel_handler.go +++ b/src/internal/m365/collection/groups/channel_handler.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -35,11 +36,11 @@ func (bh channelsBackupHandler) getChannels( return bh.ac.GetChannels(ctx, bh.protectedResource) } -func (bh channelsBackupHandler) getChannelMessagesDelta( +func (bh channelsBackupHandler) getChannelMessageIDsDelta( ctx context.Context, channelID, prevDelta string, -) ([]models.ChatMessageable, api.DeltaUpdate, error) { - return bh.ac.GetChannelMessagesDelta(ctx, bh.protectedResource, channelID, prevDelta) +) (map[string]struct{}, api.DeltaUpdate, error) { + return bh.ac.GetChannelMessageIDsDelta(ctx, bh.protectedResource, channelID, prevDelta) } func (bh channelsBackupHandler) includeContainer( @@ -63,3 +64,10 @@ func (bh channelsBackupHandler) canonicalPath( path.ChannelMessagesCategory, false) } + +func (bh channelsBackupHandler) getChannelMessage( + ctx context.Context, + teamID, channelID, itemID string, +) (models.ChatMessageable, *details.GroupsInfo, error) { + return bh.ac.GetChannelMessage(ctx, teamID, channelID, itemID) +} diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index c1e6a4042..0bc3e8e05 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -4,13 +4,21 @@ import ( "bytes" "context" "io" + "sync" + "sync/atomic" "time" + "github.com/alcionai/clues" + kjson "github.com/microsoft/kiota-serialization-json-go" + + "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -28,14 +36,14 @@ const ( type Collection struct { protectedResource string - items chan data.Item + stream chan data.Item // added is a list of existing item IDs that were added to a container added map[string]struct{} // removed is a list of item IDs that were deleted from, or moved out, of a container removed map[string]struct{} - // items itemGetterSerializer + getter getChannelMessager category path.CategoryType statusUpdater support.StatusUpdater @@ -66,6 +74,7 @@ type Collection struct { // If both are populated, then state is either moved (if they differ), // or notMoved (if they match). func NewCollection( + getter getChannelMessager, protectedResource string, curr, prev path.Path, location *path.Builder, @@ -75,17 +84,18 @@ func NewCollection( // doNotMergeItems bool, ) Collection { collection := Collection{ - added: make(map[string]struct{}, 0), + added: map[string]struct{}{}, category: category, ctrl: ctrlOpts, - items: make(chan data.Item, collectionChannelBufferSize), // doNotMergeItems: doNotMergeItems, fullPath: curr, + getter: getter, locationPath: location, prevPath: prev, removed: make(map[string]struct{}, 0), state: data.StateOf(prev, curr), statusUpdater: statusUpdater, + stream: make(chan data.Item, collectionChannelBufferSize), protectedResource: protectedResource, } @@ -95,8 +105,8 @@ func NewCollection( // Items utility function to asynchronously execute process to fill data channel with // M365 exchange objects and returns the data channel func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { - // go col.streamItems(ctx, errs) - return col.items + go col.streamItems(ctx, errs) + return col.stream } // FullPath returns the Collection's fullPath []string @@ -136,7 +146,7 @@ type Item struct { // going forward. Using []byte for now but I assume we'll have // some structured type in here (serialization to []byte can be done in `Read`) message []byte - info *details.ExchangeInfo // temporary change to bring populate function into directory + info *details.GroupsInfo // temporary change to bring populate function into directory // TODO(ashmrtn): Can probably eventually be sourced from info as there's a // request to provide modtime in ItemInfo structs. modTime time.Time @@ -158,7 +168,7 @@ func (i Item) Deleted() bool { } func (i *Item) Info() details.ItemInfo { - return details.ItemInfo{Exchange: i.info} + return details.ItemInfo{Groups: i.info} } func (i *Item) ModTime() time.Time { @@ -168,7 +178,7 @@ func (i *Item) ModTime() time.Time { func NewItem( identifier string, dataBytes []byte, - detail details.ExchangeInfo, + detail details.GroupsInfo, modTime time.Time, ) Item { return Item{ @@ -178,3 +188,146 @@ func NewItem( modTime: modTime, } } + +// --------------------------------------------------------------------------- +// items() production +// --------------------------------------------------------------------------- + +func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { + var ( + streamedItems int64 + totalBytes int64 + wg sync.WaitGroup + colProgress chan<- struct{} + el = errs.Local() + ) + + ctx = clues.Add(ctx, "category", col.category.String()) + + defer func() { + col.finishPopulation(ctx, streamedItems, totalBytes, errs.Failure()) + }() + + if len(col.added)+len(col.removed) > 0 { + colProgress = observe.CollectionProgress( + ctx, + col.FullPath().Category().String(), + col.LocationPath().Elements()) + defer close(colProgress) + } + + semaphoreCh := make(chan struct{}, col.ctrl.Parallelism.ItemFetch) + defer close(semaphoreCh) + + // TODO: add for v1 with incrementals + // delete all removed items + // for id := range col.removed { + // semaphoreCh <- struct{}{} + + // wg.Add(1) + + // go func(id string) { + // defer wg.Done() + // defer func() { <-semaphoreCh }() + + // col.stream <- &Item{ + // id: id, + // modTime: time.Now().UTC(), // removed items have no modTime entry. + // deleted: true, + // } + + // atomic.AddInt64(&streamedItems, 1) + // atomic.AddInt64(&totalBytes, 0) + + // if colProgress != nil { + // colProgress <- struct{}{} + // } + // }(id) + // } + + // add any new items + for id := range col.added { + if el.Failure() != nil { + break + } + + wg.Add(1) + semaphoreCh <- struct{}{} + + go func(id string) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + writer := kjson.NewJsonSerializationWriter() + defer writer.Close() + + flds := col.fullPath.Folders() + parentFolderID := flds[len(flds)-1] + + item, info, err := col.getter.getChannelMessage( + ctx, + col.protectedResource, + parentFolderID, + id) + if err != nil { + logger.CtxErr(ctx, err).Info("writing channel message to serializer") + return + } + + if err := writer.WriteObjectValue("", item); err != nil { + logger.CtxErr(ctx, err).Info("writing channel message to serializer") + return + } + + data, err := writer.GetSerializedContent() + if err != nil { + logger.CtxErr(ctx, err).Info("serializing channel message") + return + } + + info.ParentPath = col.LocationPath().String() + + col.stream <- &Item{ + id: ptr.Val(item.GetId()), + message: data, + info: info, + modTime: info.Modified, + } + + atomic.AddInt64(&streamedItems, 1) + atomic.AddInt64(&totalBytes, info.Size) + + if colProgress != nil { + colProgress <- struct{}{} + } + }(id) + } + + wg.Wait() +} + +// finishPopulation is a utility function used to close a Collection's data channel +// and to send the status update through the channel. +func (col *Collection) finishPopulation( + ctx context.Context, + streamedItems, totalBytes int64, + err error, +) { + close(col.stream) + + attempted := len(col.added) + len(col.removed) + status := support.CreateStatus( + ctx, + support.Backup, + 1, + support.CollectionMetrics{ + Objects: attempted, + Successes: int(streamedItems), + Bytes: totalBytes, + }, + col.FullPath().Folder(false)) + + logger.Ctx(ctx).Debugw("done streaming items", "status", status.String()) + + col.statusUpdater(status) +} diff --git a/src/internal/m365/collection/groups/collection_test.go b/src/internal/m365/collection/groups/collection_test.go new file mode 100644 index 000000000..fe2aed34d --- /dev/null +++ b/src/internal/m365/collection/groups/collection_test.go @@ -0,0 +1,135 @@ +package groups + +import ( + "bytes" + "testing" + + "github.com/alcionai/clues" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/path" +) + +type CollectionSuite struct { + tester.Suite +} + +func TestCollectionSuite(t *testing.T) { + suite.Run(t, &CollectionSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *CollectionSuite) TestReader_Valid() { + m := []byte("test message") + description := "aFile" + ed := &Item{id: description, message: m} + + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(ed.ToReader()) + assert.NoError(suite.T(), err, clues.ToCore(err)) + assert.Equal(suite.T(), buf.Bytes(), m) + assert.Equal(suite.T(), description, ed.ID()) +} + +func (suite *CollectionSuite) TestReader_Empty() { + var ( + empty []byte + expected int64 + t = suite.T() + ) + + ed := &Item{message: empty} + buf := &bytes.Buffer{} + received, err := buf.ReadFrom(ed.ToReader()) + + assert.Equal(t, expected, received) + assert.NoError(t, err, clues.ToCore(err)) +} + +func (suite *CollectionSuite) TestCollection_NewCollection() { + t := suite.T() + tenant := "a-tenant" + protectedResource := "a-protectedResource" + folder := "a-folder" + name := "protectedResource" + + fullPath, err := path.Build( + tenant, + protectedResource, + path.GroupsService, + path.ChannelMessagesCategory, + false, + folder) + require.NoError(t, err, clues.ToCore(err)) + + edc := Collection{ + protectedResource: name, + fullPath: fullPath, + } + assert.Equal(t, name, edc.protectedResource) + assert.Equal(t, fullPath, edc.FullPath()) +} + +func (suite *CollectionSuite) TestNewCollection_state() { + fooP, err := path.Build("t", "u", path.GroupsService, path.ChannelMessagesCategory, false, "foo") + require.NoError(suite.T(), err, clues.ToCore(err)) + barP, err := path.Build("t", "u", path.GroupsService, path.ChannelMessagesCategory, false, "bar") + require.NoError(suite.T(), err, clues.ToCore(err)) + + locPB := path.Builder{}.Append("human-readable") + + table := []struct { + name string + prev path.Path + curr path.Path + loc *path.Builder + expect data.CollectionState + }{ + { + name: "new", + curr: fooP, + loc: locPB, + expect: data.NewState, + }, + { + name: "not moved", + prev: fooP, + curr: fooP, + loc: locPB, + expect: data.NotMovedState, + }, + { + name: "moved", + prev: fooP, + curr: barP, + loc: locPB, + expect: data.MovedState, + }, + { + name: "deleted", + prev: fooP, + expect: data.DeletedState, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + c := NewCollection( + nil, + "g", + test.curr, test.prev, test.loc, + 0, + nil, + control.DefaultOptions()) + assert.Equal(t, test.expect, c.State(), "collection state") + assert.Equal(t, test.curr, c.fullPath, "full path") + assert.Equal(t, test.prev, c.prevPath, "prev path") + assert.Equal(t, test.loc, c.locationPath, "location path") + }) + } +} diff --git a/src/internal/m365/collection/groups/handlers.go b/src/internal/m365/collection/groups/handlers.go index 6f9b19c3d..7309b2cd4 100644 --- a/src/internal/m365/collection/groups/handlers.go +++ b/src/internal/m365/collection/groups/handlers.go @@ -6,22 +6,25 @@ import ( "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" ) type backupHandler interface { + getChannelMessager + // gets all channels for the group getChannels( ctx context.Context, ) ([]models.Channelable, error) - // gets all messages by delta in the channel. - getChannelMessagesDelta( + // gets all message IDs by delta in the channel + getChannelMessageIDsDelta( ctx context.Context, channelID, prevDelta string, - ) ([]models.ChatMessageable, api.DeltaUpdate, error) + ) (map[string]struct{}, api.DeltaUpdate, error) // includeContainer evaluates whether the channel is included // in the provided scope. @@ -39,3 +42,10 @@ type backupHandler interface { tenantID string, ) (path.Path, error) } + +type getChannelMessager interface { + getChannelMessage( + ctx context.Context, + teamID, channelID, itemID string, + ) (models.ChatMessageable, *details.GroupsInfo, error) +} diff --git a/src/internal/m365/onedrive_test.go b/src/internal/m365/onedrive_test.go index 53b45be52..d53bfb2e8 100644 --- a/src/internal/m365/onedrive_test.go +++ b/src/internal/m365/onedrive_test.go @@ -861,8 +861,6 @@ func testRestoreNoPermissionsAndBackup(suite oneDriveSuite, startVersion int) { restoreCfg.OnCollision = control.Replace restoreCfg.IncludePermissions = false - fmt.Printf("\n-----\nrcfg %+v\n-----\n", restoreCfg.IncludePermissions) - runRestoreBackupTestVersions( t, testData, diff --git a/src/internal/operations/pathtransformer/restore_path_transformer.go b/src/internal/operations/pathtransformer/restore_path_transformer.go index 5dbb9bba3..847dfd4ab 100644 --- a/src/internal/operations/pathtransformer/restore_path_transformer.go +++ b/src/internal/operations/pathtransformer/restore_path_transformer.go @@ -129,7 +129,7 @@ func makeRestorePathsForEntry( // * Exchange Email/Contacts // * OneDrive/SharePoint (needs drive information) switch true { - case ent.Exchange != nil: + case ent.Exchange != nil || ent.Groups != nil: // TODO(ashmrtn): Eventually make Events have it's own function to handle // setting the restore destination properly. res.RestorePath, err = basicLocationPath(repoRef, locRef) diff --git a/src/internal/operations/test/group_test.go b/src/internal/operations/test/group_test.go index d2352919e..08393cd12 100644 --- a/src/internal/operations/test/group_test.go +++ b/src/internal/operations/test/group_test.go @@ -23,7 +23,7 @@ type GroupsBackupIntgSuite struct { } func TestGroupsBackupIntgSuite(t *testing.T) { - t.Skip("enable when groups e2e v0 backup is complete") + t.Skip("todo: enable") suite.Run(t, &GroupsBackupIntgSuite{ Suite: tester.NewIntegrationSuite( @@ -47,14 +47,16 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic() { defer flush() var ( - mb = evmock.NewBus() - sel = selectors.NewGroupsBackup([]string{suite.its.group.ID}) - opts = control.DefaultOptions() + mb = evmock.NewBus() + sel = selectors.NewGroupsBackup([]string{suite.its.group.ID}) + opts = control.DefaultOptions() + whatSet = deeTD.CategoryFromRepoRef ) sel.Include( - selTD.GroupsBackupLibraryFolderScope(sel), - selTD.GroupsBackupChannelScope(sel)) // FIXME: channel backups are not ready + // TODO(abin): ensure implementation succeeds + // selTD.GroupsBackupLibraryFolderScope(sel), + selTD.GroupsBackupChannelScope(sel)) bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup) defer bod.close(t, ctx) @@ -68,7 +70,27 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic() { &bo, bod.sel, bod.sel.ID(), - path.LibrariesCategory) + path.ChannelMessagesCategory) + + _, expectDeets := deeTD.GetDeetsInBackup( + t, + ctx, + bo.Results.BackupID, + bod.acct.ID(), + bod.sel.ID(), + path.GroupsService, + whatSet, + bod.kms, + bod.sss) + deeTD.CheckBackupDetails( + t, + ctx, + bo.Results.BackupID, + whatSet, + bod.kms, + bod.sss, + expectDeets, + false) } func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsExtensions() { diff --git a/src/internal/operations/test/helper_test.go b/src/internal/operations/test/helper_test.go index 93dcf3e45..b6e79bcd8 100644 --- a/src/internal/operations/test/helper_test.go +++ b/src/internal/operations/test/helper_test.go @@ -132,22 +132,22 @@ func prepNewTestBackupOp( bod.sw = store.NewWrapper(bod.kms) - connectorResource := resource.Users + var connectorResource resource.Category - switch sel.Service { - case selectors.ServiceSharePoint: + switch sel.PathService() { + case path.SharePointService: connectorResource = resource.Sites - case selectors.ServiceGroups: + case path.GroupsService: connectorResource = resource.Groups + default: + connectorResource = resource.Users } bod.ctrl, bod.sel = ControllerWithSelector( t, ctx, bod.acct, - connectorResource, - sel, - nil, + connectorResource, sel, nil, bod.close) bo := newTestBackupOp( @@ -543,12 +543,12 @@ func ControllerWithSelector( t *testing.T, ctx context.Context, //revive:disable-line:context-as-argument acct account.Account, - cr resource.Category, + rc resource.Category, sel selectors.Selector, ins idname.Cacher, onFail func(*testing.T, context.Context), ) (*m365.Controller, selectors.Selector) { - ctrl, err := m365.NewController(ctx, acct, cr, sel.PathService(), control.DefaultOptions()) + ctrl, err := m365.NewController(ctx, acct, rc, sel.PathService(), control.DefaultOptions()) if !assert.NoError(t, err, clues.ToCore(err)) { if onFail != nil { onFail(t, ctx) diff --git a/src/pkg/backup/details/entry.go b/src/pkg/backup/details/entry.go index cfadd8641..07530c620 100644 --- a/src/pkg/backup/details/entry.go +++ b/src/pkg/backup/details/entry.go @@ -159,6 +159,10 @@ func (de Entry) Headers() []string { hs = append(hs, de.ItemInfo.OneDrive.Headers()...) } + if de.ItemInfo.Groups != nil { + hs = append(hs, de.ItemInfo.Groups.Headers()...) + } + return hs } @@ -182,5 +186,9 @@ func (de Entry) Values() []string { vs = append(vs, de.ItemInfo.OneDrive.Values()...) } + if de.ItemInfo.Groups != nil { + vs = append(vs, de.ItemInfo.Groups.Values()...) + } + return vs } diff --git a/src/pkg/backup/details/groups.go b/src/pkg/backup/details/groups.go index 92281b592..847b36fd6 100644 --- a/src/pkg/backup/details/groups.go +++ b/src/pkg/backup/details/groups.go @@ -47,8 +47,6 @@ type GroupsInfo struct { Size int64 `json:"size,omitempty"` // Channels Specific - ChannelName string `json:"channelName,omitempty"` - ChannelID string `json:"channelID,omitempty"` LastReplyAt time.Time `json:"lastResponseAt,omitempty"` MessageCreator string `json:"messageCreator,omitempty"` MessagePreview string `json:"messagePreview,omitempty"` @@ -67,7 +65,7 @@ func (i GroupsInfo) Headers() []string { switch i.ItemType { case SharePointLibrary: return []string{"ItemName", "Library", "ParentPath", "Size", "Owner", "Created", "Modified"} - case TeamsChannelMessage: + case GroupsChannelMessage: return []string{"Message", "Channel", "Replies", "Creator", "Created", "Last Response"} } @@ -88,10 +86,10 @@ func (i GroupsInfo) Values() []string { dttm.FormatToTabularDisplay(i.Created), dttm.FormatToTabularDisplay(i.Modified), } - case TeamsChannelMessage: + case GroupsChannelMessage: return []string{ i.MessagePreview, - i.ChannelName, + i.ParentPath, strconv.Itoa(i.ReplyCount), i.MessageCreator, dttm.FormatToTabularDisplay(i.Created), @@ -103,22 +101,25 @@ func (i GroupsInfo) Values() []string { } func (i *GroupsInfo) UpdateParentPath(newLocPath *path.Builder) { - i.ParentPath = newLocPath.PopFront().String() + i.ParentPath = newLocPath.String() } func (i *GroupsInfo) uniqueLocation(baseLoc *path.Builder) (*uniqueLoc, error) { - var category path.CategoryType + var ( + loc uniqueLoc + err error + ) switch i.ItemType { case SharePointLibrary: - category = path.LibrariesCategory - if len(i.DriveID) == 0 { return nil, clues.New("empty drive ID") } - } - loc, err := NewGroupsLocationIDer(category, i.DriveID, baseLoc.Elements()...) + loc, err = NewGroupsLocationIDer(path.LibrariesCategory, i.DriveID, baseLoc.Elements()...) + case GroupsChannelMessage: + loc, err = NewGroupsLocationIDer(path.ChannelMessagesCategory, "", baseLoc.Elements()...) + } return &loc, err } diff --git a/src/pkg/backup/details/iteminfo.go b/src/pkg/backup/details/iteminfo.go index ef71343dd..25a77d5c2 100644 --- a/src/pkg/backup/details/iteminfo.go +++ b/src/pkg/backup/details/iteminfo.go @@ -39,7 +39,7 @@ const ( FolderItem ItemType = 306 // Groups/Teams(40x) - TeamsChannelMessage ItemType = 401 + GroupsChannelMessage ItemType = 401 ) func UpdateItem(item *ItemInfo, newLocPath *path.Builder) { @@ -59,6 +59,8 @@ func UpdateItem(item *ItemInfo, newLocPath *path.Builder) { item.SharePoint.UpdateParentPath(newLocPath) } else if item.OneDrive != nil { item.OneDrive.UpdateParentPath(newLocPath) + } else if item.Groups != nil { + item.Groups.UpdateParentPath(newLocPath) } } @@ -93,6 +95,9 @@ func (i ItemInfo) infoType() ItemType { case i.OneDrive != nil: return i.OneDrive.ItemType + + case i.Groups != nil: + return i.Groups.ItemType } return UnknownType @@ -109,6 +114,9 @@ func (i ItemInfo) size() int64 { case i.SharePoint != nil: return i.SharePoint.Size + case i.Groups != nil: + return i.Groups.Size + case i.Folder != nil: return i.Folder.Size } @@ -127,6 +135,9 @@ func (i ItemInfo) Modified() time.Time { case i.SharePoint != nil: return i.SharePoint.Modified + case i.Groups != nil: + return i.Groups.Modified + case i.Folder != nil: return i.Folder.Modified } diff --git a/src/pkg/selectors/groups.go b/src/pkg/selectors/groups.go index 1bbd75f08..93e827694 100644 --- a/src/pkg/selectors/groups.go +++ b/src/pkg/selectors/groups.go @@ -409,7 +409,6 @@ const ( // channel and drive selection GroupsInfoSiteLibraryDrive groupsCategory = "GroupsInfoSiteLibraryDrive" - GroupsInfoChannel groupsCategory = "GroupsInfoChannel" // data contained within details.ItemInfo GroupsInfoChannelMessageCreatedAfter groupsCategory = "GroupsInfoChannelMessageCreatedAfter" @@ -672,18 +671,6 @@ func (s GroupsScope) matchesInfo(dii details.ItemInfo) bool { } return matchesAny(s, GroupsInfoSiteLibraryDrive, ds) - case GroupsInfoChannel: - ds := []string{} - - if len(info.ChannelID) > 0 { - ds = append(ds, info.ChannelID) - } - - if len(info.ChannelName) > 0 { - ds = append(ds, info.ChannelName) - } - - return matchesAny(s, GroupsInfoChannel, ds) case GroupsInfoChannelMessageCreator: i = info.MessageCreator case GroupsInfoChannelMessageCreatedAfter, GroupsInfoChannelMessageCreatedBefore: diff --git a/src/pkg/selectors/groups_test.go b/src/pkg/selectors/groups_test.go index dc5b6cc09..bfc7e080c 100644 --- a/src/pkg/selectors/groups_test.go +++ b/src/pkg/selectors/groups_test.go @@ -159,7 +159,7 @@ func (suite *GroupsSelectorSuite) TestGroupsRestore_Reduce() { LocationRef: strings.Join(itemElems1, "/"), ItemInfo: details.ItemInfo{ Groups: &details.GroupsInfo{ - ItemType: details.TeamsChannelMessage, + ItemType: details.GroupsChannelMessage, ParentPath: strings.Join(itemElems1, "/"), }, }, @@ -170,7 +170,7 @@ func (suite *GroupsSelectorSuite) TestGroupsRestore_Reduce() { // ItemRef intentionally blank to test fallback case ItemInfo: details.ItemInfo{ Groups: &details.GroupsInfo{ - ItemType: details.TeamsChannelMessage, + ItemType: details.GroupsChannelMessage, ParentPath: strings.Join(itemElems2, "/"), }, }, @@ -181,7 +181,7 @@ func (suite *GroupsSelectorSuite) TestGroupsRestore_Reduce() { LocationRef: strings.Join(itemElems3, "/"), ItemInfo: details.ItemInfo{ Groups: &details.GroupsInfo{ - ItemType: details.TeamsChannelMessage, + ItemType: details.GroupsChannelMessage, ParentPath: strings.Join(itemElems3, "/"), }, }, @@ -366,7 +366,7 @@ func (suite *GroupsSelectorSuite) TestGroupsScope_MatchesInfo() { now = time.Now() modification = now.Add(15 * time.Minute) future = now.Add(45 * time.Minute) - dtch = details.TeamsChannelMessage + dtch = details.GroupsChannelMessage ) table := []struct { diff --git a/src/pkg/services/m365/api/channels.go b/src/pkg/services/m365/api/channels.go index 4e99480c7..8a176d4a3 100644 --- a/src/pkg/services/m365/api/channels.go +++ b/src/pkg/services/m365/api/channels.go @@ -3,16 +3,16 @@ package api import ( "context" "fmt" + "time" "github.com/alcionai/clues" - "github.com/microsoft/kiota-abstractions-go/serialization" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/teams" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/str" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/pkg/backup/details" - "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" ) @@ -104,12 +104,10 @@ func (c Channels) GetChannelByName( // message // --------------------------------------------------------------------------- -// GetMessage retrieves a ChannelMessage item. -func (c Channels) GetMessage( +func (c Channels) GetChannelMessage( ctx context.Context, - teamID, channelID, itemID string, - errs *fault.Bus, -) (serialization.Parsable, *details.GroupsInfo, error) { + teamID, channelID, messageID string, +) (models.ChatMessageable, *details.GroupsInfo, error) { var size int64 message, err := c.Stable. @@ -119,60 +117,68 @@ func (c Channels) GetMessage( Channels(). ByChannelIdString(channelID). Messages(). - ByChatMessageIdString(itemID). + ByChatMessageIdString(messageID). Get(ctx, nil) if err != nil { return nil, nil, graph.Stack(ctx, err) } - return message, ChannelMessageInfo(message, size), nil -} + info := ChannelMessageInfo(message, size) -// --------------------------------------------------------------------------- -// replies -// --------------------------------------------------------------------------- - -// GetReplies retrieves all replies to a Channel Message. -func (c Channels) GetReplies( - ctx context.Context, - teamID, channelID, messageID string, -) (serialization.Parsable, error) { - replies, err := c.Stable. - Client(). - Teams(). - ByTeamIdString(teamID). - Channels(). - ByChannelIdString(channelID). - Messages(). - ByChatMessageIdString(messageID). - Replies(). - Get(ctx, nil) - if err != nil { - return nil, graph.Stack(ctx, err) - } - - return replies, nil + return message, info, nil } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- -func ChannelMessageInfo(msg models.ChatMessageable, size int64) *details.GroupsInfo { - created := ptr.Val(msg.GetCreatedDateTime()) +func ChannelMessageInfo( + msg models.ChatMessageable, + size int64, +) *details.GroupsInfo { + var ( + lastReply time.Time + modTime = ptr.OrNow(msg.GetLastModifiedDateTime()) + msgCreator string + ) + + for _, r := range msg.GetReplies() { + cdt := ptr.Val(r.GetCreatedDateTime()) + if cdt.After(lastReply) { + lastReply = cdt + } + } + + // if the message hasn't been modified since before the most recent + // reply, set the modified time to the most recent reply. This ensures + // we update the message contents to match changes in replies. + if modTime.Before(lastReply) { + modTime = lastReply + } + + from := msg.GetFrom() + + switch true { + case from.GetApplication() != nil: + msgCreator = ptr.Val(from.GetApplication().GetDisplayName()) + case from.GetDevice() != nil: + msgCreator = ptr.Val(from.GetDevice().GetDisplayName()) + case from.GetUser() != nil: + msgCreator = ptr.Val(from.GetUser().GetDisplayName()) + } return &details.GroupsInfo{ - ItemType: details.TeamsChannelMessage, - Size: size, - Created: created, - Modified: ptr.OrNow(msg.GetLastModifiedDateTime()), + ItemType: details.GroupsChannelMessage, + Created: ptr.Val(msg.GetCreatedDateTime()), + LastReplyAt: lastReply, + Modified: modTime, + MessageCreator: msgCreator, + MessagePreview: str.Preview(ptr.Val(msg.GetBody().GetContent()), 16), + ReplyCount: len(msg.GetReplies()), + Size: size, } } -// --------------------------------------------------------------------------- -// helper funcs -// --------------------------------------------------------------------------- - // CheckIDAndName is a validator that ensures the ID // and name are populated and not zero valued. func CheckIDAndName(c models.Channelable) error { diff --git a/src/pkg/services/m365/api/channels_pager.go b/src/pkg/services/m365/api/channels_pager.go index eeaa41486..c05355865 100644 --- a/src/pkg/services/m365/api/channels_pager.go +++ b/src/pkg/services/m365/api/channels_pager.go @@ -52,6 +52,7 @@ func (p *channelMessageDeltaPageCtrl) ValuesIn(l PageLinker) ([]models.ChatMessa func (c Channels) NewChannelMessageDeltaPager( teamID, channelID, prevDelta string, + selectProps ...string, ) *channelMessageDeltaPageCtrl { builder := c.Stable. Client(). @@ -67,7 +68,12 @@ func (c Channels) NewChannelMessageDeltaPager( } options := &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetRequestConfiguration{ - Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), + QueryParameters: &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetQueryParameters{}, + Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), + } + + if len(selectProps) > 0 { + options.QueryParameters.Select = selectProps } return &channelMessageDeltaPageCtrl{ @@ -79,13 +85,18 @@ func (c Channels) NewChannelMessageDeltaPager( } } -// GetChannelMessagesDelta fetches a delta of all messages in the channel. -func (c Channels) GetChannelMessagesDelta( +// GetChannelMessageIDsDelta fetches a delta of all messages in the channel. +func (c Channels) GetChannelMessageIDsDelta( ctx context.Context, teamID, channelID, prevDelta string, -) ([]models.ChatMessageable, DeltaUpdate, error) { +) (map[string]struct{}, DeltaUpdate, error) { var ( - vs = []models.ChatMessageable{} + vs = map[string]struct{}{} + // select is not currently allowed on messages + // this func will still isolate to the ID, however, + // because we need the follow-up get request to gather + // all replies to the message. + // selectProps = idAnd() pager = c.NewChannelMessageDeltaPager(teamID, channelID, prevDelta) invalidPrevDelta = len(prevDelta) == 0 newDeltaLink string @@ -98,7 +109,7 @@ func (c Channels) GetChannelMessagesDelta( logger.Ctx(ctx).Infow("Invalid previous delta", "delta_link", prevDelta) invalidPrevDelta = true - vs = []models.ChatMessageable{} + vs = map[string]struct{}{} pager.Reset(ctx) @@ -114,7 +125,9 @@ func (c Channels) GetChannelMessagesDelta( return nil, DeltaUpdate{}, graph.Wrap(ctx, err, "extracting channel messages from response") } - vs = append(vs, vals...) + for _, v := range vals { + vs[ptr.Val(v.GetId())] = struct{}{} + } nextLink, deltaLink := NextAndDeltaLink(page) @@ -139,6 +152,109 @@ func (c Channels) GetChannelMessagesDelta( return vs, du, nil } +// --------------------------------------------------------------------------- +// channel message replies pager +// --------------------------------------------------------------------------- + +var _ Pager[models.ChatMessageable] = &channelMessageRepliesPageCtrl{} + +type channelMessageRepliesPageCtrl struct { + gs graph.Servicer + builder *teams.ItemChannelsItemMessagesItemRepliesRequestBuilder + options *teams.ItemChannelsItemMessagesItemRepliesRequestBuilderGetRequestConfiguration +} + +func (p *channelMessageRepliesPageCtrl) SetNext(nextLink string) { + p.builder = teams.NewItemChannelsItemMessagesItemRepliesRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *channelMessageRepliesPageCtrl) GetPage( + ctx context.Context, +) (PageLinker, error) { + resp, err := p.builder.Get(ctx, p.options) + return resp, graph.Stack(ctx, err).OrNil() +} + +func (p *channelMessageRepliesPageCtrl) GetOdataNextLink() *string { + return ptr.To("") +} + +func (p *channelMessageRepliesPageCtrl) ValuesIn(l PageLinker) ([]models.ChatMessageable, error) { + return getValues[models.ChatMessageable](l) +} + +func (c Channels) NewChannelMessageRepliesPager( + teamID, channelID, messageID string, + selectProps ...string, +) *channelMessageRepliesPageCtrl { + options := &teams.ItemChannelsItemMessagesItemRepliesRequestBuilderGetRequestConfiguration{ + Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), + } + + if len(selectProps) > 0 { + options.QueryParameters.Select = selectProps + } + + res := &channelMessageRepliesPageCtrl{ + gs: c.Stable, + options: options, + builder: c.Stable. + Client(). + Teams(). + ByTeamIdString(teamID). + Channels(). + ByChannelIdString(channelID). + Messages(). + ByChatMessageIdString(messageID). + Replies(), + } + + return res +} + +// GetChannels fetches the minimum valuable data from each reply in the message +func (c Channels) GetChannelMessageReplies( + ctx context.Context, + teamID, channelID, messageID string, +) ([]models.ChatMessageable, error) { + var ( + vs = []models.ChatMessageable{} + // select is not currently enabled for replies. + // selectProps = idAnd( + // "messageType", + // "createdDateTime", + // "from", + // "body") + pager = c.NewChannelMessageRepliesPager(teamID, channelID, messageID) + ) + + // Loop through all pages returned by Graph API. + for { + page, err := pager.GetPage(ctx) + if err != nil { + return nil, graph.Wrap(ctx, err, "retrieving page of channels") + } + + vals, err := pager.ValuesIn(page) + if err != nil { + return nil, graph.Wrap(ctx, err, "extracting channels from response") + } + + vs = append(vs, vals...) + + nextLink := ptr.Val(page.GetOdataNextLink()) + if len(nextLink) == 0 { + break + } + + pager.SetNext(nextLink) + } + + logger.Ctx(ctx).Debugf("retrieved %d channel message replies", len(vs)) + + return vs, nil +} + // --------------------------------------------------------------------------- // channel pager // --------------------------------------------------------------------------- diff --git a/src/pkg/services/m365/api/channels_pager_test.go b/src/pkg/services/m365/api/channels_pager_test.go index 2e78a1787..a6a246ae5 100644 --- a/src/pkg/services/m365/api/channels_pager_test.go +++ b/src/pkg/services/m365/api/channels_pager_test.go @@ -2,13 +2,18 @@ package api_test import ( "testing" + "time" "github.com/alcionai/clues" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/str" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" + "github.com/alcionai/corso/src/pkg/services/m365/api" ) type ChannelsPagerIntgSuite struct { @@ -51,23 +56,79 @@ func (suite *ChannelsPagerIntgSuite) TestEnumerateChannelMessages() { ctx, flush := tester.NewContext(t) defer flush() - msgs, du, err := ac.GetChannelMessagesDelta( + msgIDs, du, err := ac.GetChannelMessageIDsDelta( ctx, suite.its.group.id, suite.its.group.testContainerID, "") require.NoError(t, err, clues.ToCore(err)) - require.NotEmpty(t, msgs) + require.NotEmpty(t, msgIDs) require.NotZero(t, du.URL, "delta link") require.True(t, du.Reset, "reset due to empty prev delta link") - msgs, du, err = ac.GetChannelMessagesDelta( + msgIDs, du, err = ac.GetChannelMessageIDsDelta( ctx, suite.its.group.id, suite.its.group.testContainerID, du.URL) require.NoError(t, err, clues.ToCore(err)) - require.Empty(t, msgs, "should have no new messages from delta") + require.Empty(t, msgIDs, "should have no new messages from delta") require.NotZero(t, du.URL, "delta link") require.False(t, du.Reset, "prev delta link should be valid") + + for id := range msgIDs { + suite.Run(id+"-replies", func() { + testEnumerateChannelMessageReplies( + suite.T(), + suite.its.ac.Channels(), + suite.its.group.id, + suite.its.group.testContainerID, + id) + }) + } +} + +func testEnumerateChannelMessageReplies( + t *testing.T, + ac api.Channels, + groupID, channelID, messageID string, +) { + ctx, flush := tester.NewContext(t) + defer flush() + + msg, info, err := ac.GetChannelMessage(ctx, groupID, channelID, messageID) + require.NoError(t, err, clues.ToCore(err)) + + replies, err := ac.GetChannelMessageReplies(ctx, groupID, channelID, messageID) + require.NoError(t, err, clues.ToCore(err)) + + var ( + lastReply time.Time + replyIDs = map[string]struct{}{} + ) + + for _, r := range replies { + cdt := ptr.Val(r.GetCreatedDateTime()) + if cdt.After(lastReply) { + lastReply = cdt + } + + replyIDs[ptr.Val(r.GetId())] = struct{}{} + } + + assert.Equal(t, messageID, ptr.Val(msg.GetId())) + assert.Equal(t, channelID, ptr.Val(msg.GetChannelIdentity().GetChannelId())) + assert.Equal(t, groupID, ptr.Val(msg.GetChannelIdentity().GetTeamId())) + assert.Equal(t, len(replies), info.ReplyCount) + assert.Equal(t, msg.GetFrom().GetUser().GetDisplayName(), info.MessageCreator) + assert.Equal(t, lastReply, info.LastReplyAt) + assert.Equal(t, str.Preview(ptr.Val(msg.GetBody().GetContent()), 16), info.MessagePreview) + + msgReplyIDs := map[string]struct{}{} + + for _, reply := range msg.GetReplies() { + msgReplyIDs[ptr.Val(reply.GetId())] = struct{}{} + } + + assert.Equal(t, replyIDs, msgReplyIDs) } diff --git a/src/pkg/services/m365/api/config.go b/src/pkg/services/m365/api/config.go index a1c752686..efc6cd071 100644 --- a/src/pkg/services/m365/api/config.go +++ b/src/pkg/services/m365/api/config.go @@ -87,7 +87,13 @@ func newEventualConsistencyHeaders() *abstractions.RequestHeaders { // makes a slice with []string{"id", s...} func idAnd(ss ...string) []string { - return append([]string{"id"}, ss...) + id := []string{"id"} + + if len(ss) == 0 { + return id + } + + return append(id, ss...) } // --------------------------------------------------------------------------- diff --git a/src/pkg/services/m365/api/events.go b/src/pkg/services/m365/api/events.go index 5ae03692e..3686bae0a 100644 --- a/src/pkg/services/m365/api/events.go +++ b/src/pkg/services/m365/api/events.go @@ -212,9 +212,10 @@ func (c Events) GetItem( // cancelledOccurrences end up in AdditionalData // https://learn.microsoft.com/en-us/graph/api/resources/event?view=graph-rest-beta#properties rawURL := fmt.Sprintf(eventExceptionsBetaURLTemplate, userID, itemID) - builder := users.NewItemEventsEventItemRequestBuilder(rawURL, c.Stable.Adapter()) - event, err = builder.Get(ctx, config) + event, err = users. + NewItemEventsEventItemRequestBuilder(rawURL, c.Stable.Adapter()). + Get(ctx, config) if err != nil { return nil, nil, graph.Stack(ctx, err) }