make teams and groups work in the cli (#4139)

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🌻 Feature

#### Issue(s)

* #3989

#### Test Plan

- [x] 💪 Manual
- [x]  Unit test
This commit is contained in:
Keepers 2023-09-05 15:14:34 -06:00 committed by GitHub
parent 5aee3cc2ae
commit 86e1cef3a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1018 additions and 242 deletions

View File

@ -196,7 +196,7 @@ var defaultSelectorConfig = selectors.Config{OnlyMatchItemNames: true}
func runBackups(
ctx context.Context,
r repository.Repository,
serviceName, resourceOwnerType string,
serviceName string,
selectorSet []selectors.Selector,
ins idname.Cacher,
) error {

View File

@ -191,7 +191,7 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error {
return runBackups(
ctx,
r,
"Exchange", "user",
"Exchange",
selectorSet,
ins)
}

View File

@ -3,6 +3,7 @@ package backup
import (
"context"
"errors"
"fmt"
"github.com/alcionai/clues"
"github.com/spf13/cobra"
@ -74,7 +75,7 @@ func addGroupsCommands(cmd *cobra.Command) *cobra.Command {
// Flags addition ordering should follow the order we want them to appear in help and docs:
flags.AddGroupFlag(c)
flags.AddDataFlag(c, []string{dataLibraries}, false)
flags.AddDataFlag(c, []string{flags.DataLibraries, flags.DataMessages}, false)
flags.AddCorsoPassphaseFlags(c)
flags.AddAWSCredsFlags(c)
flags.AddAzureCredsFlags(c)
@ -176,7 +177,7 @@ func createGroupsCmd(cmd *cobra.Command, args []string) error {
return runBackups(
ctx,
r,
"Group", "group",
"Group",
selectorSet,
ins)
}
@ -315,19 +316,21 @@ func validateGroupsBackupCreateFlags(groups, cats []string) error {
)
}
// TODO(meain)
// for _, d := range cats {
// if d != dataLibraries {
// return clues.New(
// d + " is an unrecognized data type; only " + dataLibraries + " is supported"
// )
// }
// }
msg := fmt.Sprintf(
" is an unrecognized data type; only %s and %s are supported",
flags.DataLibraries, flags.DataMessages)
allowedCats := utils.GroupsAllowedCategories()
for _, d := range cats {
if _, ok := allowedCats[d]; !ok {
return clues.New(d + msg)
}
}
return nil
}
// TODO: users might specify a data type, this only supports AllData().
func groupsBackupCreateSelectors(
ctx context.Context,
ins idname.Cacher,
@ -339,27 +342,9 @@ func groupsBackupCreateSelectors(
sel := selectors.NewGroupsBackup(slices.Clone(group))
return addGroupsCategories(sel, cats)
return utils.AddGroupsCategories(sel, cats)
}
func includeAllGroupWithCategories(ins idname.Cacher, categories []string) *selectors.GroupsBackup {
return addGroupsCategories(selectors.NewGroupsBackup(ins.IDs()), categories)
}
func addGroupsCategories(sel *selectors.GroupsBackup, cats []string) *selectors.GroupsBackup {
if len(cats) == 0 {
sel.Include(sel.AllData())
}
// TODO(meain): handle filtering
// for _, d := range cats {
// switch d {
// case dataLibraries:
// sel.Include(sel.LibraryFolders(selectors.Any()))
// case dataPages:
// sel.Include(sel.Pages(selectors.Any()))
// }
// }
return sel
return utils.AddGroupsCategories(selectors.NewGroupsBackup(ins.IDs()), categories)
}

View File

@ -3,6 +3,7 @@ package backup
import (
"testing"
"github.com/alcionai/clues"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -96,3 +97,43 @@ func (suite *GroupsUnitSuite) TestAddGroupsCommands() {
})
}
}
func (suite *GroupsUnitSuite) TestValidateGroupsBackupCreateFlags() {
table := []struct {
name string
cats []string
expect assert.ErrorAssertionFunc
}{
{
name: "none",
cats: []string{},
expect: assert.NoError,
},
{
name: "libraries",
cats: []string{flags.DataLibraries},
expect: assert.NoError,
},
{
name: "messages",
cats: []string{flags.DataMessages},
expect: assert.NoError,
},
{
name: "all allowed",
cats: []string{flags.DataLibraries, flags.DataMessages},
expect: assert.NoError,
},
{
name: "bad inputs",
cats: []string{"foo"},
expect: assert.Error,
},
}
for _, test := range table {
suite.Run(test.name, func() {
err := validateGroupsBackupCreateFlags([]string{"*"}, test.cats)
test.expect(suite.T(), err, clues.ToCore(err))
})
}
}

View File

@ -172,7 +172,7 @@ func createOneDriveCmd(cmd *cobra.Command, args []string) error {
return runBackups(
ctx,
r,
"OneDrive", "user",
"OneDrive",
selectorSet,
ins)
}

View File

@ -28,11 +28,6 @@ import (
// setup and globals
// ------------------------------------------------------------------------------------------------
const (
dataLibraries = "libraries"
dataPages = "pages"
)
const (
sharePointServiceCommand = "sharepoint"
sharePointServiceCommandCreateUseSuffix = "--site <siteURL> | '" + flags.Wildcard + "'"
@ -90,7 +85,7 @@ func addSharePointCommands(cmd *cobra.Command) *cobra.Command {
flags.AddCorsoPassphaseFlags(c)
flags.AddAWSCredsFlags(c)
flags.AddAzureCredsFlags(c)
flags.AddDataFlag(c, []string{dataLibraries}, true)
flags.AddDataFlag(c, []string{flags.DataLibraries}, true)
flags.AddFailFastFlag(c)
flags.AddDisableIncrementalsFlag(c)
flags.AddForceItemDataDownloadFlag(c)
@ -193,7 +188,7 @@ func createSharePointCmd(cmd *cobra.Command, args []string) error {
return runBackups(
ctx,
r,
"SharePoint", "site",
"SharePoint",
selectorSet,
ins)
}
@ -208,9 +203,9 @@ func validateSharePointBackupCreateFlags(sites, weburls, cats []string) error {
}
for _, d := range cats {
if d != dataLibraries && d != dataPages {
if d != flags.DataLibraries && d != flags.DataPages {
return clues.New(
d + " is an unrecognized data type; either " + dataLibraries + "or " + dataPages,
d + " is an unrecognized data type; either " + flags.DataLibraries + "or " + flags.DataPages,
)
}
}
@ -253,9 +248,9 @@ func addCategories(sel *selectors.SharePointBackup, cats []string) *selectors.Sh
for _, d := range cats {
switch d {
case dataLibraries:
case flags.DataLibraries:
sel.Include(sel.LibraryFolders(selectors.Any()))
case dataPages:
case flags.DataPages:
sel.Include(sel.Pages(selectors.Any()))
}
}

View File

@ -226,7 +226,7 @@ func (suite *SharePointUnitSuite) TestSharePointBackupCreateSelectors() {
{
name: "Pages",
site: bothIDs,
data: []string{dataPages},
data: []string{flags.DataPages},
expect: bothIDs,
},
}

View File

@ -1,14 +1,24 @@
package backup
import (
"context"
"fmt"
"github.com/alcionai/clues"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/exp/slices"
"github.com/alcionai/corso/src/cli/flags"
. "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/cli/repo"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/filters"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365"
)
// ------------------------------------------------------------------------------------------------
@ -134,11 +144,38 @@ func createTeamsCmd(cmd *cobra.Command, args []string) error {
return nil
}
if err := validateTeamBackupCreateFlags(flags.TeamFV); err != nil {
if err := validateTeamsBackupCreateFlags(flags.TeamFV, flags.CategoryDataFV); err != nil {
return err
}
r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, path.GroupsService, repo.S3Overrides(cmd))
if err != nil {
return Only(ctx, err)
}
return Only(ctx, utils.ErrNotYetImplemented)
defer utils.CloseRepo(ctx, r)
// TODO: log/print recoverable errors
errs := fault.New(false)
ins, err := m365.GroupsMap(ctx, *acct, errs)
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to retrieve M365 teams"))
}
sel := teamsBackupCreateSelectors(ctx, ins, flags.TeamFV, flags.CategoryDataFV)
selectorSet := []selectors.Selector{}
for _, discSel := range sel.SplitByResourceOwner(ins.IDs()) {
selectorSet = append(selectorSet, discSel.Selector)
}
return runBackups(
ctx,
r,
"Group",
selectorSet,
ins)
}
// ------------------------------------------------------------------------------------------------
@ -208,7 +245,7 @@ func deleteTeamsCmd(cmd *cobra.Command, args []string) error {
// helpers
// ---------------------------------------------------------------------------
func validateTeamBackupCreateFlags(teams []string) error {
func validateTeamsBackupCreateFlags(teams, cats []string) error {
if len(teams) == 0 {
return clues.New(
"requires one or more --" +
@ -217,14 +254,35 @@ func validateTeamBackupCreateFlags(teams []string) error {
)
}
// TODO(meain)
// for _, d := range cats {
// if d != dataLibraries {
// return clues.New(
// d + " is an unrecognized data type; only " + dataLibraries + " is supported"
// )
// }
// }
msg := fmt.Sprintf(
" is an unrecognized data type; only %s and %s are supported",
flags.DataLibraries, flags.DataMessages)
allowedCats := utils.GroupsAllowedCategories()
for _, d := range cats {
if _, ok := allowedCats[d]; !ok {
return clues.New(d + msg)
}
}
return nil
}
func teamsBackupCreateSelectors(
ctx context.Context,
ins idname.Cacher,
team, cats []string,
) *selectors.GroupsBackup {
if filters.PathContains(team).Compare(flags.Wildcard) {
return includeAllTeamWithCategories(ins, cats)
}
sel := selectors.NewGroupsBackup(slices.Clone(team))
return utils.AddGroupsCategories(sel, cats)
}
func includeAllTeamWithCategories(ins idname.Cacher, categories []string) *selectors.GroupsBackup {
return utils.AddGroupsCategories(selectors.NewGroupsBackup(ins.IDs()), categories)
}

View File

@ -3,6 +3,7 @@ package backup
import (
"testing"
"github.com/alcionai/clues"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -96,3 +97,43 @@ func (suite *TeamsUnitSuite) TestAddTeamsCommands() {
})
}
}
func (suite *TeamsUnitSuite) TestValidateTeamsBackupCreateFlags() {
table := []struct {
name string
cats []string
expect assert.ErrorAssertionFunc
}{
{
name: "none",
cats: []string{},
expect: assert.NoError,
},
{
name: "libraries",
cats: []string{flags.DataLibraries},
expect: assert.NoError,
},
{
name: "messages",
cats: []string{flags.DataMessages},
expect: assert.NoError,
},
{
name: "all allowed",
cats: []string{flags.DataLibraries, flags.DataMessages},
expect: assert.NoError,
},
{
name: "bad inputs",
cats: []string{"foo"},
expect: assert.Error,
},
}
for _, test := range table {
suite.Run(test.name, func() {
err := validateTeamsBackupCreateFlags([]string{"*"}, test.cats)
test.expect(suite.T(), err, clues.ToCore(err))
})
}
}

View File

@ -4,9 +4,9 @@ import (
"github.com/spf13/cobra"
)
const (
GroupFN = "group"
)
const DataMessages = "messages"
const GroupFN = "group"
var GroupFV []string

View File

@ -4,6 +4,11 @@ import (
"github.com/spf13/cobra"
)
const (
DataLibraries = "libraries"
DataPages = "pages"
)
const (
LibraryFN = "library"
ListFolderFN = "list"

View File

@ -19,6 +19,30 @@ type GroupsOpts struct {
Populated flags.PopulatedFlags
}
func GroupsAllowedCategories() map[string]struct{} {
return map[string]struct{}{
flags.DataLibraries: {},
flags.DataMessages: {},
}
}
func AddGroupsCategories(sel *selectors.GroupsBackup, cats []string) *selectors.GroupsBackup {
if len(cats) == 0 {
sel.Include(sel.AllData())
}
for _, d := range cats {
switch d {
case flags.DataLibraries:
sel.Include(sel.LibraryFolders(selectors.Any()))
case flags.DataMessages:
sel.Include(sel.ChannelMessages(selectors.Any(), selectors.Any()))
}
}
return sel
}
func MakeGroupsOpts(cmd *cobra.Command) GroupsOpts {
return GroupsOpts{
Groups: flags.UserFV,

View File

@ -6,8 +6,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/cli/flags"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/selectors"
)
type GroupsUtilsSuite struct {
@ -159,3 +161,44 @@ func (suite *GroupsUtilsSuite) TestValidateGroupsRestoreFlags() {
})
}
}
func (suite *GroupsUtilsSuite) TestAddGroupsCategories() {
table := []struct {
name string
cats []string
expectScopeLen int
}{
{
name: "none",
cats: []string{},
expectScopeLen: 2,
},
{
name: "libraries",
cats: []string{flags.DataLibraries},
expectScopeLen: 1,
},
{
name: "messages",
cats: []string{flags.DataMessages},
expectScopeLen: 1,
},
{
name: "all allowed",
cats: []string{flags.DataLibraries, flags.DataMessages},
expectScopeLen: 2,
},
{
name: "bad inputs",
cats: []string{"foo"},
expectScopeLen: 0,
},
}
for _, test := range table {
suite.Run(test.name, func() {
sel := utils.AddGroupsCategories(selectors.NewGroupsBackup(selectors.Any()), test.cats)
scopes := sel.Scopes()
assert.Len(suite.T(), scopes, test.expectScopeLen)
})
}
}

View File

@ -56,3 +56,27 @@ func First(vs ...string) string {
return ""
}
// Preview reduces the string to the specified size.
// If the string is longer than the size, the last three
// characters are replaced with an ellipsis. Size < 4
// will default to 4.
// ex:
// Preview("123", 6) => "123"
// Preview("1234567", 6) "123..."
func Preview(s string, size int) string {
if size < 4 {
size = 4
}
if len(s) < size {
return s
}
ss := s[:size]
if len(s) > size {
ss = s[:size-3] + "..."
}
return ss
}

View File

@ -0,0 +1,53 @@
package str
import (
"testing"
"github.com/stretchr/testify/assert"
)
// ---------------------------------------------------------------------------
// Warning: importing the corso tester.suite causes a circular import
// ---------------------------------------------------------------------------
func TestPreview(t *testing.T) {
table := []struct {
input string
size int
expect string
}{
{
input: "",
size: 1,
expect: "",
},
{
input: "yes",
size: 1,
expect: "yes",
},
{
input: "yes!",
size: 5,
expect: "yes!",
},
{
input: "however",
size: 6,
expect: "how...",
},
{
input: "negative",
size: -1,
expect: "n...",
},
}
for _, test := range table {
t.Run(test.input, func(t *testing.T) {
assert.Equal(
t,
test.expect,
Preview(test.input, test.size))
})
}
}

View File

@ -90,7 +90,6 @@ func augmentItemInfo(
}
case path.GroupsService:
// TODO: Add channel name and ID
dii.Groups = &details.GroupsInfo{
Created: ptr.Val(item.GetCreatedDateTime()),
DriveID: driveID,

View File

@ -521,8 +521,6 @@ func restoreV6File(
return itemInfo, nil
}
fmt.Printf("\n-----\nrestorev6 %+v\n-----\n", rcc.RestoreConfig.IncludePermissions)
err = RestorePermissions(
ctx,
rh,
@ -572,8 +570,6 @@ func CreateRestoreFolders(
return id, nil
}
fmt.Printf("\n-----\ncreatefolders %+v\n-----\n", restorePerms)
err = RestorePermissions(
ctx,
rh,

View File

@ -40,14 +40,14 @@ const (
// Structure holds data for an Exchange application for a single user
type Collection struct {
user string
data chan data.Item
stream chan data.Item
// added is a list of existing item IDs that were added to a container
added map[string]struct{}
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{}
items itemGetterSerializer
getter itemGetterSerializer
category path.CategoryType
statusUpdater support.StatusUpdater
@ -91,10 +91,10 @@ func NewCollection(
added: make(map[string]struct{}, 0),
category: category,
ctrl: ctrlOpts,
data: make(chan data.Item, collectionChannelBufferSize),
stream: make(chan data.Item, collectionChannelBufferSize),
doNotMergeItems: doNotMergeItems,
fullPath: curr,
items: items,
getter: items,
locationPath: location,
prevPath: prev,
removed: make(map[string]struct{}, 0),
@ -110,7 +110,7 @@ func NewCollection(
// M365 exchange objects and returns the data channel
func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
go col.streamItems(ctx, errs)
return col.data
return col.stream
}
// FullPath returns the Collection's fullPath []string
@ -164,7 +164,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
if len(col.added)+len(col.removed) > 0 {
colProgress = observe.CollectionProgress(
ctx,
col.fullPath.Category().String(),
col.FullPath().Category().String(),
col.LocationPath().Elements())
defer close(colProgress)
}
@ -182,7 +182,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
defer wg.Done()
defer func() { <-semaphoreCh }()
col.data <- &Item{
col.stream <- &Item{
id: id,
modTime: time.Now().UTC(), // removed items have no modTime entry.
deleted: true,
@ -211,7 +211,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
defer wg.Done()
defer func() { <-semaphoreCh }()
item, info, err := col.items.GetItem(
item, info, err := col.getter.GetItem(
ctx,
user,
id,
@ -232,7 +232,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
return
}
data, err := col.items.Serialize(ctx, item, user, id)
data, err := col.getter.Serialize(ctx, item, user, id)
if err != nil {
errs.AddRecoverable(ctx, clues.Wrap(err, "serializing item").Label(fault.LabelForceNoBackupCreation))
return
@ -244,9 +244,9 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
info.Size = int64(len(data))
}
info.ParentPath = col.locationPath.String()
info.ParentPath = col.LocationPath().String()
col.data <- &Item{
col.stream <- &Item{
id: id,
message: data,
info: info,
@ -265,7 +265,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
wg.Wait()
}
// terminatePopulateSequence is a utility function used to close a Collection's data channel
// finishPopulation is a utility function used to close a Collection's data channel
// and to send the status update through the channel.
func (col *Collection) finishPopulation(
ctx context.Context,
@ -273,10 +273,11 @@ func (col *Collection) finishPopulation(
totalBytes int64,
err error,
) {
close(col.data)
close(col.stream)
attempted := len(col.added) + len(col.removed)
status := support.CreateStatus(ctx,
status := support.CreateStatus(
ctx,
support.Backup,
1,
support.CollectionMetrics{
@ -284,7 +285,7 @@ func (col *Collection) finishPopulation(
Successes: success,
Bytes: totalBytes,
},
col.fullPath.Folder(false))
col.FullPath().Folder(false))
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())

View File

@ -81,29 +81,6 @@ func (suite *CollectionSuite) TestReader_Empty() {
assert.NoError(t, err, clues.ToCore(err))
}
func (suite *CollectionSuite) TestColleciton_FullPath() {
t := suite.T()
tenant := "a-tenant"
user := "a-user"
folder := "a-folder"
fullPath, err := path.Build(
tenant,
user,
path.ExchangeService,
path.EmailCategory,
false,
folder)
require.NoError(t, err, clues.ToCore(err))
edc := Collection{
user: user,
fullPath: fullPath,
}
assert.Equal(t, fullPath, edc.FullPath())
}
func (suite *CollectionSuite) TestCollection_NewCollection() {
t := suite.T()
tenant := "a-tenant"

View File

@ -5,6 +5,7 @@ import (
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
@ -102,7 +103,7 @@ func populateCollections(
// the process have been deleted.
// tombstones = makeTombstones(dps)
logger.Ctx(ctx).Infow("filling collections")
logger.Ctx(ctx).Info("filling collections")
// , "len_deltapaths", len(dps))
el := errs.Local()
@ -147,7 +148,7 @@ func populateCollections(
// ictx = clues.Add(ictx, "previous_path", prevPath)
items, _, err := bh.getChannelMessagesDelta(ctx, cID, "")
items, _, err := bh.getChannelMessageIDsDelta(ctx, cID, "")
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err))
continue
@ -168,6 +169,7 @@ func populateCollections(
}
edc := NewCollection(
bh,
qp.ProtectedResource.ID(),
currPath,
prevPath,
@ -192,9 +194,7 @@ func populateCollections(
// currPaths[cID] = currPath.String()
// FIXME: normally this goes before removal, but the linters require no bottom comments
for _, item := range items {
edc.added[ptr.Val(item.GetId())] = struct{}{}
}
maps.Copy(edc.added, items)
}
// TODO: handle tombstones here

View File

@ -21,6 +21,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
@ -38,8 +39,11 @@ var _ backupHandler = &mockBackupHandler{}
type mockBackupHandler struct {
channels []models.Channelable
channelsErr error
messages []models.ChatMessageable
messageIDs map[string]struct{}
messagesErr error
messages map[string]models.ChatMessageable
info map[string]*details.GroupsInfo
getMessageErr map[string]error
doNotInclude bool
}
@ -47,11 +51,11 @@ func (bh mockBackupHandler) getChannels(context.Context) ([]models.Channelable,
return bh.channels, bh.channelsErr
}
func (bh mockBackupHandler) getChannelMessagesDelta(
func (bh mockBackupHandler) getChannelMessageIDsDelta(
_ context.Context,
_, _ string,
) ([]models.ChatMessageable, api.DeltaUpdate, error) {
return bh.messages, api.DeltaUpdate{}, bh.messagesErr
) (map[string]struct{}, api.DeltaUpdate, error) {
return bh.messageIDs, api.DeltaUpdate{}, bh.messagesErr
}
func (bh mockBackupHandler) includeContainer(
@ -76,6 +80,13 @@ func (bh mockBackupHandler) canonicalPath(
false)
}
func (bh mockBackupHandler) getChannelMessage(
_ context.Context,
_, _, itemID string,
) (models.ChatMessageable, *details.GroupsInfo, error) {
return bh.messages[itemID], bh.info[itemID], bh.getMessageErr[itemID]
}
// ---------------------------------------------------------------------------
// Unit Suite
// ---------------------------------------------------------------------------
@ -122,7 +133,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() {
name: "happy path, one container",
mock: mockBackupHandler{
channels: testdata.StubChannels("one"),
messages: testdata.StubChatMessages("msg-one"),
messageIDs: map[string]struct{}{"msg-one": {}},
},
scope: allScope,
expectErr: require.NoError,
@ -135,7 +146,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() {
name: "happy path, many containers",
mock: mockBackupHandler{
channels: testdata.StubChannels("one", "two"),
messages: testdata.StubChatMessages("msg-one"),
messageIDs: map[string]struct{}{"msg-one": {}},
},
scope: allScope,
expectErr: require.NoError,

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -35,11 +36,11 @@ func (bh channelsBackupHandler) getChannels(
return bh.ac.GetChannels(ctx, bh.protectedResource)
}
func (bh channelsBackupHandler) getChannelMessagesDelta(
func (bh channelsBackupHandler) getChannelMessageIDsDelta(
ctx context.Context,
channelID, prevDelta string,
) ([]models.ChatMessageable, api.DeltaUpdate, error) {
return bh.ac.GetChannelMessagesDelta(ctx, bh.protectedResource, channelID, prevDelta)
) (map[string]struct{}, api.DeltaUpdate, error) {
return bh.ac.GetChannelMessageIDsDelta(ctx, bh.protectedResource, channelID, prevDelta)
}
func (bh channelsBackupHandler) includeContainer(
@ -63,3 +64,10 @@ func (bh channelsBackupHandler) canonicalPath(
path.ChannelMessagesCategory,
false)
}
func (bh channelsBackupHandler) getChannelMessage(
ctx context.Context,
teamID, channelID, itemID string,
) (models.ChatMessageable, *details.GroupsInfo, error) {
return bh.ac.GetChannelMessage(ctx, teamID, channelID, itemID)
}

View File

@ -4,13 +4,21 @@ import (
"bytes"
"context"
"io"
"sync"
"sync/atomic"
"time"
"github.com/alcionai/clues"
kjson "github.com/microsoft/kiota-serialization-json-go"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
@ -28,14 +36,14 @@ const (
type Collection struct {
protectedResource string
items chan data.Item
stream chan data.Item
// added is a list of existing item IDs that were added to a container
added map[string]struct{}
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{}
// items itemGetterSerializer
getter getChannelMessager
category path.CategoryType
statusUpdater support.StatusUpdater
@ -66,6 +74,7 @@ type Collection struct {
// If both are populated, then state is either moved (if they differ),
// or notMoved (if they match).
func NewCollection(
getter getChannelMessager,
protectedResource string,
curr, prev path.Path,
location *path.Builder,
@ -75,17 +84,18 @@ func NewCollection(
// doNotMergeItems bool,
) Collection {
collection := Collection{
added: make(map[string]struct{}, 0),
added: map[string]struct{}{},
category: category,
ctrl: ctrlOpts,
items: make(chan data.Item, collectionChannelBufferSize),
// doNotMergeItems: doNotMergeItems,
fullPath: curr,
getter: getter,
locationPath: location,
prevPath: prev,
removed: make(map[string]struct{}, 0),
state: data.StateOf(prev, curr),
statusUpdater: statusUpdater,
stream: make(chan data.Item, collectionChannelBufferSize),
protectedResource: protectedResource,
}
@ -95,8 +105,8 @@ func NewCollection(
// Items utility function to asynchronously execute process to fill data channel with
// M365 exchange objects and returns the data channel
func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
// go col.streamItems(ctx, errs)
return col.items
go col.streamItems(ctx, errs)
return col.stream
}
// FullPath returns the Collection's fullPath []string
@ -136,7 +146,7 @@ type Item struct {
// going forward. Using []byte for now but I assume we'll have
// some structured type in here (serialization to []byte can be done in `Read`)
message []byte
info *details.ExchangeInfo // temporary change to bring populate function into directory
info *details.GroupsInfo // temporary change to bring populate function into directory
// TODO(ashmrtn): Can probably eventually be sourced from info as there's a
// request to provide modtime in ItemInfo structs.
modTime time.Time
@ -158,7 +168,7 @@ func (i Item) Deleted() bool {
}
func (i *Item) Info() details.ItemInfo {
return details.ItemInfo{Exchange: i.info}
return details.ItemInfo{Groups: i.info}
}
func (i *Item) ModTime() time.Time {
@ -168,7 +178,7 @@ func (i *Item) ModTime() time.Time {
func NewItem(
identifier string,
dataBytes []byte,
detail details.ExchangeInfo,
detail details.GroupsInfo,
modTime time.Time,
) Item {
return Item{
@ -178,3 +188,146 @@ func NewItem(
modTime: modTime,
}
}
// ---------------------------------------------------------------------------
// items() production
// ---------------------------------------------------------------------------
func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
var (
streamedItems int64
totalBytes int64
wg sync.WaitGroup
colProgress chan<- struct{}
el = errs.Local()
)
ctx = clues.Add(ctx, "category", col.category.String())
defer func() {
col.finishPopulation(ctx, streamedItems, totalBytes, errs.Failure())
}()
if len(col.added)+len(col.removed) > 0 {
colProgress = observe.CollectionProgress(
ctx,
col.FullPath().Category().String(),
col.LocationPath().Elements())
defer close(colProgress)
}
semaphoreCh := make(chan struct{}, col.ctrl.Parallelism.ItemFetch)
defer close(semaphoreCh)
// TODO: add for v1 with incrementals
// delete all removed items
// for id := range col.removed {
// semaphoreCh <- struct{}{}
// wg.Add(1)
// go func(id string) {
// defer wg.Done()
// defer func() { <-semaphoreCh }()
// col.stream <- &Item{
// id: id,
// modTime: time.Now().UTC(), // removed items have no modTime entry.
// deleted: true,
// }
// atomic.AddInt64(&streamedItems, 1)
// atomic.AddInt64(&totalBytes, 0)
// if colProgress != nil {
// colProgress <- struct{}{}
// }
// }(id)
// }
// add any new items
for id := range col.added {
if el.Failure() != nil {
break
}
wg.Add(1)
semaphoreCh <- struct{}{}
go func(id string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
flds := col.fullPath.Folders()
parentFolderID := flds[len(flds)-1]
item, info, err := col.getter.getChannelMessage(
ctx,
col.protectedResource,
parentFolderID,
id)
if err != nil {
logger.CtxErr(ctx, err).Info("writing channel message to serializer")
return
}
if err := writer.WriteObjectValue("", item); err != nil {
logger.CtxErr(ctx, err).Info("writing channel message to serializer")
return
}
data, err := writer.GetSerializedContent()
if err != nil {
logger.CtxErr(ctx, err).Info("serializing channel message")
return
}
info.ParentPath = col.LocationPath().String()
col.stream <- &Item{
id: ptr.Val(item.GetId()),
message: data,
info: info,
modTime: info.Modified,
}
atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, info.Size)
if colProgress != nil {
colProgress <- struct{}{}
}
}(id)
}
wg.Wait()
}
// finishPopulation is a utility function used to close a Collection's data channel
// and to send the status update through the channel.
func (col *Collection) finishPopulation(
ctx context.Context,
streamedItems, totalBytes int64,
err error,
) {
close(col.stream)
attempted := len(col.added) + len(col.removed)
status := support.CreateStatus(
ctx,
support.Backup,
1,
support.CollectionMetrics{
Objects: attempted,
Successes: int(streamedItems),
Bytes: totalBytes,
},
col.FullPath().Folder(false))
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
col.statusUpdater(status)
}

View File

@ -0,0 +1,135 @@
package groups
import (
"bytes"
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path"
)
type CollectionSuite struct {
tester.Suite
}
func TestCollectionSuite(t *testing.T) {
suite.Run(t, &CollectionSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *CollectionSuite) TestReader_Valid() {
m := []byte("test message")
description := "aFile"
ed := &Item{id: description, message: m}
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(ed.ToReader())
assert.NoError(suite.T(), err, clues.ToCore(err))
assert.Equal(suite.T(), buf.Bytes(), m)
assert.Equal(suite.T(), description, ed.ID())
}
func (suite *CollectionSuite) TestReader_Empty() {
var (
empty []byte
expected int64
t = suite.T()
)
ed := &Item{message: empty}
buf := &bytes.Buffer{}
received, err := buf.ReadFrom(ed.ToReader())
assert.Equal(t, expected, received)
assert.NoError(t, err, clues.ToCore(err))
}
func (suite *CollectionSuite) TestCollection_NewCollection() {
t := suite.T()
tenant := "a-tenant"
protectedResource := "a-protectedResource"
folder := "a-folder"
name := "protectedResource"
fullPath, err := path.Build(
tenant,
protectedResource,
path.GroupsService,
path.ChannelMessagesCategory,
false,
folder)
require.NoError(t, err, clues.ToCore(err))
edc := Collection{
protectedResource: name,
fullPath: fullPath,
}
assert.Equal(t, name, edc.protectedResource)
assert.Equal(t, fullPath, edc.FullPath())
}
func (suite *CollectionSuite) TestNewCollection_state() {
fooP, err := path.Build("t", "u", path.GroupsService, path.ChannelMessagesCategory, false, "foo")
require.NoError(suite.T(), err, clues.ToCore(err))
barP, err := path.Build("t", "u", path.GroupsService, path.ChannelMessagesCategory, false, "bar")
require.NoError(suite.T(), err, clues.ToCore(err))
locPB := path.Builder{}.Append("human-readable")
table := []struct {
name string
prev path.Path
curr path.Path
loc *path.Builder
expect data.CollectionState
}{
{
name: "new",
curr: fooP,
loc: locPB,
expect: data.NewState,
},
{
name: "not moved",
prev: fooP,
curr: fooP,
loc: locPB,
expect: data.NotMovedState,
},
{
name: "moved",
prev: fooP,
curr: barP,
loc: locPB,
expect: data.MovedState,
},
{
name: "deleted",
prev: fooP,
expect: data.DeletedState,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
c := NewCollection(
nil,
"g",
test.curr, test.prev, test.loc,
0,
nil,
control.DefaultOptions())
assert.Equal(t, test.expect, c.State(), "collection state")
assert.Equal(t, test.curr, c.fullPath, "full path")
assert.Equal(t, test.prev, c.prevPath, "prev path")
assert.Equal(t, test.loc, c.locationPath, "location path")
})
}
}

View File

@ -6,22 +6,25 @@ import (
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type backupHandler interface {
getChannelMessager
// gets all channels for the group
getChannels(
ctx context.Context,
) ([]models.Channelable, error)
// gets all messages by delta in the channel.
getChannelMessagesDelta(
// gets all message IDs by delta in the channel
getChannelMessageIDsDelta(
ctx context.Context,
channelID, prevDelta string,
) ([]models.ChatMessageable, api.DeltaUpdate, error)
) (map[string]struct{}, api.DeltaUpdate, error)
// includeContainer evaluates whether the channel is included
// in the provided scope.
@ -39,3 +42,10 @@ type backupHandler interface {
tenantID string,
) (path.Path, error)
}
type getChannelMessager interface {
getChannelMessage(
ctx context.Context,
teamID, channelID, itemID string,
) (models.ChatMessageable, *details.GroupsInfo, error)
}

View File

@ -861,8 +861,6 @@ func testRestoreNoPermissionsAndBackup(suite oneDriveSuite, startVersion int) {
restoreCfg.OnCollision = control.Replace
restoreCfg.IncludePermissions = false
fmt.Printf("\n-----\nrcfg %+v\n-----\n", restoreCfg.IncludePermissions)
runRestoreBackupTestVersions(
t,
testData,

View File

@ -129,7 +129,7 @@ func makeRestorePathsForEntry(
// * Exchange Email/Contacts
// * OneDrive/SharePoint (needs drive information)
switch true {
case ent.Exchange != nil:
case ent.Exchange != nil || ent.Groups != nil:
// TODO(ashmrtn): Eventually make Events have it's own function to handle
// setting the restore destination properly.
res.RestorePath, err = basicLocationPath(repoRef, locRef)

View File

@ -23,7 +23,7 @@ type GroupsBackupIntgSuite struct {
}
func TestGroupsBackupIntgSuite(t *testing.T) {
t.Skip("enable when groups e2e v0 backup is complete")
t.Skip("todo: enable")
suite.Run(t, &GroupsBackupIntgSuite{
Suite: tester.NewIntegrationSuite(
@ -50,11 +50,13 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic() {
mb = evmock.NewBus()
sel = selectors.NewGroupsBackup([]string{suite.its.group.ID})
opts = control.DefaultOptions()
whatSet = deeTD.CategoryFromRepoRef
)
sel.Include(
selTD.GroupsBackupLibraryFolderScope(sel),
selTD.GroupsBackupChannelScope(sel)) // FIXME: channel backups are not ready
// TODO(abin): ensure implementation succeeds
// selTD.GroupsBackupLibraryFolderScope(sel),
selTD.GroupsBackupChannelScope(sel))
bo, bod := prepNewTestBackupOp(t, ctx, mb, sel.Selector, opts, version.Backup)
defer bod.close(t, ctx)
@ -68,7 +70,27 @@ func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsBasic() {
&bo,
bod.sel,
bod.sel.ID(),
path.LibrariesCategory)
path.ChannelMessagesCategory)
_, expectDeets := deeTD.GetDeetsInBackup(
t,
ctx,
bo.Results.BackupID,
bod.acct.ID(),
bod.sel.ID(),
path.GroupsService,
whatSet,
bod.kms,
bod.sss)
deeTD.CheckBackupDetails(
t,
ctx,
bo.Results.BackupID,
whatSet,
bod.kms,
bod.sss,
expectDeets,
false)
}
func (suite *GroupsBackupIntgSuite) TestBackup_Run_groupsExtensions() {

View File

@ -132,22 +132,22 @@ func prepNewTestBackupOp(
bod.sw = store.NewWrapper(bod.kms)
connectorResource := resource.Users
var connectorResource resource.Category
switch sel.Service {
case selectors.ServiceSharePoint:
switch sel.PathService() {
case path.SharePointService:
connectorResource = resource.Sites
case selectors.ServiceGroups:
case path.GroupsService:
connectorResource = resource.Groups
default:
connectorResource = resource.Users
}
bod.ctrl, bod.sel = ControllerWithSelector(
t,
ctx,
bod.acct,
connectorResource,
sel,
nil,
connectorResource, sel, nil,
bod.close)
bo := newTestBackupOp(
@ -543,12 +543,12 @@ func ControllerWithSelector(
t *testing.T,
ctx context.Context, //revive:disable-line:context-as-argument
acct account.Account,
cr resource.Category,
rc resource.Category,
sel selectors.Selector,
ins idname.Cacher,
onFail func(*testing.T, context.Context),
) (*m365.Controller, selectors.Selector) {
ctrl, err := m365.NewController(ctx, acct, cr, sel.PathService(), control.DefaultOptions())
ctrl, err := m365.NewController(ctx, acct, rc, sel.PathService(), control.DefaultOptions())
if !assert.NoError(t, err, clues.ToCore(err)) {
if onFail != nil {
onFail(t, ctx)

View File

@ -159,6 +159,10 @@ func (de Entry) Headers() []string {
hs = append(hs, de.ItemInfo.OneDrive.Headers()...)
}
if de.ItemInfo.Groups != nil {
hs = append(hs, de.ItemInfo.Groups.Headers()...)
}
return hs
}
@ -182,5 +186,9 @@ func (de Entry) Values() []string {
vs = append(vs, de.ItemInfo.OneDrive.Values()...)
}
if de.ItemInfo.Groups != nil {
vs = append(vs, de.ItemInfo.Groups.Values()...)
}
return vs
}

View File

@ -47,8 +47,6 @@ type GroupsInfo struct {
Size int64 `json:"size,omitempty"`
// Channels Specific
ChannelName string `json:"channelName,omitempty"`
ChannelID string `json:"channelID,omitempty"`
LastReplyAt time.Time `json:"lastResponseAt,omitempty"`
MessageCreator string `json:"messageCreator,omitempty"`
MessagePreview string `json:"messagePreview,omitempty"`
@ -67,7 +65,7 @@ func (i GroupsInfo) Headers() []string {
switch i.ItemType {
case SharePointLibrary:
return []string{"ItemName", "Library", "ParentPath", "Size", "Owner", "Created", "Modified"}
case TeamsChannelMessage:
case GroupsChannelMessage:
return []string{"Message", "Channel", "Replies", "Creator", "Created", "Last Response"}
}
@ -88,10 +86,10 @@ func (i GroupsInfo) Values() []string {
dttm.FormatToTabularDisplay(i.Created),
dttm.FormatToTabularDisplay(i.Modified),
}
case TeamsChannelMessage:
case GroupsChannelMessage:
return []string{
i.MessagePreview,
i.ChannelName,
i.ParentPath,
strconv.Itoa(i.ReplyCount),
i.MessageCreator,
dttm.FormatToTabularDisplay(i.Created),
@ -103,22 +101,25 @@ func (i GroupsInfo) Values() []string {
}
func (i *GroupsInfo) UpdateParentPath(newLocPath *path.Builder) {
i.ParentPath = newLocPath.PopFront().String()
i.ParentPath = newLocPath.String()
}
func (i *GroupsInfo) uniqueLocation(baseLoc *path.Builder) (*uniqueLoc, error) {
var category path.CategoryType
var (
loc uniqueLoc
err error
)
switch i.ItemType {
case SharePointLibrary:
category = path.LibrariesCategory
if len(i.DriveID) == 0 {
return nil, clues.New("empty drive ID")
}
}
loc, err := NewGroupsLocationIDer(category, i.DriveID, baseLoc.Elements()...)
loc, err = NewGroupsLocationIDer(path.LibrariesCategory, i.DriveID, baseLoc.Elements()...)
case GroupsChannelMessage:
loc, err = NewGroupsLocationIDer(path.ChannelMessagesCategory, "", baseLoc.Elements()...)
}
return &loc, err
}

View File

@ -39,7 +39,7 @@ const (
FolderItem ItemType = 306
// Groups/Teams(40x)
TeamsChannelMessage ItemType = 401
GroupsChannelMessage ItemType = 401
)
func UpdateItem(item *ItemInfo, newLocPath *path.Builder) {
@ -59,6 +59,8 @@ func UpdateItem(item *ItemInfo, newLocPath *path.Builder) {
item.SharePoint.UpdateParentPath(newLocPath)
} else if item.OneDrive != nil {
item.OneDrive.UpdateParentPath(newLocPath)
} else if item.Groups != nil {
item.Groups.UpdateParentPath(newLocPath)
}
}
@ -93,6 +95,9 @@ func (i ItemInfo) infoType() ItemType {
case i.OneDrive != nil:
return i.OneDrive.ItemType
case i.Groups != nil:
return i.Groups.ItemType
}
return UnknownType
@ -109,6 +114,9 @@ func (i ItemInfo) size() int64 {
case i.SharePoint != nil:
return i.SharePoint.Size
case i.Groups != nil:
return i.Groups.Size
case i.Folder != nil:
return i.Folder.Size
}
@ -127,6 +135,9 @@ func (i ItemInfo) Modified() time.Time {
case i.SharePoint != nil:
return i.SharePoint.Modified
case i.Groups != nil:
return i.Groups.Modified
case i.Folder != nil:
return i.Folder.Modified
}

View File

@ -409,7 +409,6 @@ const (
// channel and drive selection
GroupsInfoSiteLibraryDrive groupsCategory = "GroupsInfoSiteLibraryDrive"
GroupsInfoChannel groupsCategory = "GroupsInfoChannel"
// data contained within details.ItemInfo
GroupsInfoChannelMessageCreatedAfter groupsCategory = "GroupsInfoChannelMessageCreatedAfter"
@ -672,18 +671,6 @@ func (s GroupsScope) matchesInfo(dii details.ItemInfo) bool {
}
return matchesAny(s, GroupsInfoSiteLibraryDrive, ds)
case GroupsInfoChannel:
ds := []string{}
if len(info.ChannelID) > 0 {
ds = append(ds, info.ChannelID)
}
if len(info.ChannelName) > 0 {
ds = append(ds, info.ChannelName)
}
return matchesAny(s, GroupsInfoChannel, ds)
case GroupsInfoChannelMessageCreator:
i = info.MessageCreator
case GroupsInfoChannelMessageCreatedAfter, GroupsInfoChannelMessageCreatedBefore:

View File

@ -159,7 +159,7 @@ func (suite *GroupsSelectorSuite) TestGroupsRestore_Reduce() {
LocationRef: strings.Join(itemElems1, "/"),
ItemInfo: details.ItemInfo{
Groups: &details.GroupsInfo{
ItemType: details.TeamsChannelMessage,
ItemType: details.GroupsChannelMessage,
ParentPath: strings.Join(itemElems1, "/"),
},
},
@ -170,7 +170,7 @@ func (suite *GroupsSelectorSuite) TestGroupsRestore_Reduce() {
// ItemRef intentionally blank to test fallback case
ItemInfo: details.ItemInfo{
Groups: &details.GroupsInfo{
ItemType: details.TeamsChannelMessage,
ItemType: details.GroupsChannelMessage,
ParentPath: strings.Join(itemElems2, "/"),
},
},
@ -181,7 +181,7 @@ func (suite *GroupsSelectorSuite) TestGroupsRestore_Reduce() {
LocationRef: strings.Join(itemElems3, "/"),
ItemInfo: details.ItemInfo{
Groups: &details.GroupsInfo{
ItemType: details.TeamsChannelMessage,
ItemType: details.GroupsChannelMessage,
ParentPath: strings.Join(itemElems3, "/"),
},
},
@ -366,7 +366,7 @@ func (suite *GroupsSelectorSuite) TestGroupsScope_MatchesInfo() {
now = time.Now()
modification = now.Add(15 * time.Minute)
future = now.Add(45 * time.Minute)
dtch = details.TeamsChannelMessage
dtch = details.GroupsChannelMessage
)
table := []struct {

View File

@ -3,16 +3,16 @@ package api
import (
"context"
"fmt"
"time"
"github.com/alcionai/clues"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/teams"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/str"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
)
@ -104,12 +104,10 @@ func (c Channels) GetChannelByName(
// message
// ---------------------------------------------------------------------------
// GetMessage retrieves a ChannelMessage item.
func (c Channels) GetMessage(
func (c Channels) GetChannelMessage(
ctx context.Context,
teamID, channelID, itemID string,
errs *fault.Bus,
) (serialization.Parsable, *details.GroupsInfo, error) {
teamID, channelID, messageID string,
) (models.ChatMessageable, *details.GroupsInfo, error) {
var size int64
message, err := c.Stable.
@ -119,60 +117,68 @@ func (c Channels) GetMessage(
Channels().
ByChannelIdString(channelID).
Messages().
ByChatMessageIdString(itemID).
ByChatMessageIdString(messageID).
Get(ctx, nil)
if err != nil {
return nil, nil, graph.Stack(ctx, err)
}
return message, ChannelMessageInfo(message, size), nil
}
info := ChannelMessageInfo(message, size)
// ---------------------------------------------------------------------------
// replies
// ---------------------------------------------------------------------------
// GetReplies retrieves all replies to a Channel Message.
func (c Channels) GetReplies(
ctx context.Context,
teamID, channelID, messageID string,
) (serialization.Parsable, error) {
replies, err := c.Stable.
Client().
Teams().
ByTeamIdString(teamID).
Channels().
ByChannelIdString(channelID).
Messages().
ByChatMessageIdString(messageID).
Replies().
Get(ctx, nil)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return replies, nil
return message, info, nil
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
func ChannelMessageInfo(msg models.ChatMessageable, size int64) *details.GroupsInfo {
created := ptr.Val(msg.GetCreatedDateTime())
func ChannelMessageInfo(
msg models.ChatMessageable,
size int64,
) *details.GroupsInfo {
var (
lastReply time.Time
modTime = ptr.OrNow(msg.GetLastModifiedDateTime())
msgCreator string
)
for _, r := range msg.GetReplies() {
cdt := ptr.Val(r.GetCreatedDateTime())
if cdt.After(lastReply) {
lastReply = cdt
}
}
// if the message hasn't been modified since before the most recent
// reply, set the modified time to the most recent reply. This ensures
// we update the message contents to match changes in replies.
if modTime.Before(lastReply) {
modTime = lastReply
}
from := msg.GetFrom()
switch true {
case from.GetApplication() != nil:
msgCreator = ptr.Val(from.GetApplication().GetDisplayName())
case from.GetDevice() != nil:
msgCreator = ptr.Val(from.GetDevice().GetDisplayName())
case from.GetUser() != nil:
msgCreator = ptr.Val(from.GetUser().GetDisplayName())
}
return &details.GroupsInfo{
ItemType: details.TeamsChannelMessage,
ItemType: details.GroupsChannelMessage,
Created: ptr.Val(msg.GetCreatedDateTime()),
LastReplyAt: lastReply,
Modified: modTime,
MessageCreator: msgCreator,
MessagePreview: str.Preview(ptr.Val(msg.GetBody().GetContent()), 16),
ReplyCount: len(msg.GetReplies()),
Size: size,
Created: created,
Modified: ptr.OrNow(msg.GetLastModifiedDateTime()),
}
}
// ---------------------------------------------------------------------------
// helper funcs
// ---------------------------------------------------------------------------
// CheckIDAndName is a validator that ensures the ID
// and name are populated and not zero valued.
func CheckIDAndName(c models.Channelable) error {

View File

@ -52,6 +52,7 @@ func (p *channelMessageDeltaPageCtrl) ValuesIn(l PageLinker) ([]models.ChatMessa
func (c Channels) NewChannelMessageDeltaPager(
teamID, channelID, prevDelta string,
selectProps ...string,
) *channelMessageDeltaPageCtrl {
builder := c.Stable.
Client().
@ -67,9 +68,14 @@ func (c Channels) NewChannelMessageDeltaPager(
}
options := &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetQueryParameters{},
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)),
}
if len(selectProps) > 0 {
options.QueryParameters.Select = selectProps
}
return &channelMessageDeltaPageCtrl{
resourceID: teamID,
channelID: channelID,
@ -79,13 +85,18 @@ func (c Channels) NewChannelMessageDeltaPager(
}
}
// GetChannelMessagesDelta fetches a delta of all messages in the channel.
func (c Channels) GetChannelMessagesDelta(
// GetChannelMessageIDsDelta fetches a delta of all messages in the channel.
func (c Channels) GetChannelMessageIDsDelta(
ctx context.Context,
teamID, channelID, prevDelta string,
) ([]models.ChatMessageable, DeltaUpdate, error) {
) (map[string]struct{}, DeltaUpdate, error) {
var (
vs = []models.ChatMessageable{}
vs = map[string]struct{}{}
// select is not currently allowed on messages
// this func will still isolate to the ID, however,
// because we need the follow-up get request to gather
// all replies to the message.
// selectProps = idAnd()
pager = c.NewChannelMessageDeltaPager(teamID, channelID, prevDelta)
invalidPrevDelta = len(prevDelta) == 0
newDeltaLink string
@ -98,7 +109,7 @@ func (c Channels) GetChannelMessagesDelta(
logger.Ctx(ctx).Infow("Invalid previous delta", "delta_link", prevDelta)
invalidPrevDelta = true
vs = []models.ChatMessageable{}
vs = map[string]struct{}{}
pager.Reset(ctx)
@ -114,7 +125,9 @@ func (c Channels) GetChannelMessagesDelta(
return nil, DeltaUpdate{}, graph.Wrap(ctx, err, "extracting channel messages from response")
}
vs = append(vs, vals...)
for _, v := range vals {
vs[ptr.Val(v.GetId())] = struct{}{}
}
nextLink, deltaLink := NextAndDeltaLink(page)
@ -139,6 +152,109 @@ func (c Channels) GetChannelMessagesDelta(
return vs, du, nil
}
// ---------------------------------------------------------------------------
// channel message replies pager
// ---------------------------------------------------------------------------
var _ Pager[models.ChatMessageable] = &channelMessageRepliesPageCtrl{}
type channelMessageRepliesPageCtrl struct {
gs graph.Servicer
builder *teams.ItemChannelsItemMessagesItemRepliesRequestBuilder
options *teams.ItemChannelsItemMessagesItemRepliesRequestBuilderGetRequestConfiguration
}
func (p *channelMessageRepliesPageCtrl) SetNext(nextLink string) {
p.builder = teams.NewItemChannelsItemMessagesItemRepliesRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *channelMessageRepliesPageCtrl) GetPage(
ctx context.Context,
) (PageLinker, error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
func (p *channelMessageRepliesPageCtrl) GetOdataNextLink() *string {
return ptr.To("")
}
func (p *channelMessageRepliesPageCtrl) ValuesIn(l PageLinker) ([]models.ChatMessageable, error) {
return getValues[models.ChatMessageable](l)
}
func (c Channels) NewChannelMessageRepliesPager(
teamID, channelID, messageID string,
selectProps ...string,
) *channelMessageRepliesPageCtrl {
options := &teams.ItemChannelsItemMessagesItemRepliesRequestBuilderGetRequestConfiguration{
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)),
}
if len(selectProps) > 0 {
options.QueryParameters.Select = selectProps
}
res := &channelMessageRepliesPageCtrl{
gs: c.Stable,
options: options,
builder: c.Stable.
Client().
Teams().
ByTeamIdString(teamID).
Channels().
ByChannelIdString(channelID).
Messages().
ByChatMessageIdString(messageID).
Replies(),
}
return res
}
// GetChannels fetches the minimum valuable data from each reply in the message
func (c Channels) GetChannelMessageReplies(
ctx context.Context,
teamID, channelID, messageID string,
) ([]models.ChatMessageable, error) {
var (
vs = []models.ChatMessageable{}
// select is not currently enabled for replies.
// selectProps = idAnd(
// "messageType",
// "createdDateTime",
// "from",
// "body")
pager = c.NewChannelMessageRepliesPager(teamID, channelID, messageID)
)
// Loop through all pages returned by Graph API.
for {
page, err := pager.GetPage(ctx)
if err != nil {
return nil, graph.Wrap(ctx, err, "retrieving page of channels")
}
vals, err := pager.ValuesIn(page)
if err != nil {
return nil, graph.Wrap(ctx, err, "extracting channels from response")
}
vs = append(vs, vals...)
nextLink := ptr.Val(page.GetOdataNextLink())
if len(nextLink) == 0 {
break
}
pager.SetNext(nextLink)
}
logger.Ctx(ctx).Debugf("retrieved %d channel message replies", len(vs))
return vs, nil
}
// ---------------------------------------------------------------------------
// channel pager
// ---------------------------------------------------------------------------

View File

@ -2,13 +2,18 @@ package api_test
import (
"testing"
"time"
"github.com/alcionai/clues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/str"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type ChannelsPagerIntgSuite struct {
@ -51,23 +56,79 @@ func (suite *ChannelsPagerIntgSuite) TestEnumerateChannelMessages() {
ctx, flush := tester.NewContext(t)
defer flush()
msgs, du, err := ac.GetChannelMessagesDelta(
msgIDs, du, err := ac.GetChannelMessageIDsDelta(
ctx,
suite.its.group.id,
suite.its.group.testContainerID,
"")
require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, msgs)
require.NotEmpty(t, msgIDs)
require.NotZero(t, du.URL, "delta link")
require.True(t, du.Reset, "reset due to empty prev delta link")
msgs, du, err = ac.GetChannelMessagesDelta(
msgIDs, du, err = ac.GetChannelMessageIDsDelta(
ctx,
suite.its.group.id,
suite.its.group.testContainerID,
du.URL)
require.NoError(t, err, clues.ToCore(err))
require.Empty(t, msgs, "should have no new messages from delta")
require.Empty(t, msgIDs, "should have no new messages from delta")
require.NotZero(t, du.URL, "delta link")
require.False(t, du.Reset, "prev delta link should be valid")
for id := range msgIDs {
suite.Run(id+"-replies", func() {
testEnumerateChannelMessageReplies(
suite.T(),
suite.its.ac.Channels(),
suite.its.group.id,
suite.its.group.testContainerID,
id)
})
}
}
func testEnumerateChannelMessageReplies(
t *testing.T,
ac api.Channels,
groupID, channelID, messageID string,
) {
ctx, flush := tester.NewContext(t)
defer flush()
msg, info, err := ac.GetChannelMessage(ctx, groupID, channelID, messageID)
require.NoError(t, err, clues.ToCore(err))
replies, err := ac.GetChannelMessageReplies(ctx, groupID, channelID, messageID)
require.NoError(t, err, clues.ToCore(err))
var (
lastReply time.Time
replyIDs = map[string]struct{}{}
)
for _, r := range replies {
cdt := ptr.Val(r.GetCreatedDateTime())
if cdt.After(lastReply) {
lastReply = cdt
}
replyIDs[ptr.Val(r.GetId())] = struct{}{}
}
assert.Equal(t, messageID, ptr.Val(msg.GetId()))
assert.Equal(t, channelID, ptr.Val(msg.GetChannelIdentity().GetChannelId()))
assert.Equal(t, groupID, ptr.Val(msg.GetChannelIdentity().GetTeamId()))
assert.Equal(t, len(replies), info.ReplyCount)
assert.Equal(t, msg.GetFrom().GetUser().GetDisplayName(), info.MessageCreator)
assert.Equal(t, lastReply, info.LastReplyAt)
assert.Equal(t, str.Preview(ptr.Val(msg.GetBody().GetContent()), 16), info.MessagePreview)
msgReplyIDs := map[string]struct{}{}
for _, reply := range msg.GetReplies() {
msgReplyIDs[ptr.Val(reply.GetId())] = struct{}{}
}
assert.Equal(t, replyIDs, msgReplyIDs)
}

View File

@ -87,7 +87,13 @@ func newEventualConsistencyHeaders() *abstractions.RequestHeaders {
// makes a slice with []string{"id", s...}
func idAnd(ss ...string) []string {
return append([]string{"id"}, ss...)
id := []string{"id"}
if len(ss) == 0 {
return id
}
return append(id, ss...)
}
// ---------------------------------------------------------------------------

View File

@ -212,9 +212,10 @@ func (c Events) GetItem(
// cancelledOccurrences end up in AdditionalData
// https://learn.microsoft.com/en-us/graph/api/resources/event?view=graph-rest-beta#properties
rawURL := fmt.Sprintf(eventExceptionsBetaURLTemplate, userID, itemID)
builder := users.NewItemEventsEventItemRequestBuilder(rawURL, c.Stable.Adapter())
event, err = builder.Get(ctx, config)
event, err = users.
NewItemEventsEventItemRequestBuilder(rawURL, c.Stable.Adapter()).
Get(ctx, config)
if err != nil {
return nil, nil, graph.Stack(ctx, err)
}