boilerplate teamschat collection package

seems like a lot of code, but this is 95% boilerplate additions copied
from other similar collections packages.  Some noteworthy differences:
* teamsChats does not handle metadata, so all metadata, delta, and previous
path handling was removed
* teamsChats does not produce tombstones
* chats are never deleted, so no "removed" items are tracked
* all chats gets stored at the prefix root, so no "containers" are iterated, and therefore only one collection is ever produced.

This means that, overall, the boilerplate here is much reduced compared to
boilerplate in similar packages.
This commit is contained in:
ryanfkeepers 2024-01-22 16:46:53 -07:00
parent c999a3072f
commit a17ed85220
12 changed files with 1689 additions and 1 deletions

View File

@ -0,0 +1,166 @@
package teamschats
import (
"context"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
func CreateCollections[I chatsItemer](
ctx context.Context,
bpc inject.BackupProducerConfig,
bh backupHandler[I],
tenantID string,
scope selectors.TeamsChatsScope,
statusUpdater support.StatusUpdater,
useLazyReader bool,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, bool, error) {
var (
category = scope.Category().PathType()
qp = graph.QueryParams{
Category: category,
ProtectedResource: bpc.ProtectedResource,
TenantID: tenantID,
}
)
cc := api.CallConfig{
CanMakeDeltaQueries: false,
}
container, err := bh.getContainer(ctx, cc)
if err != nil {
return nil, false, clues.Stack(err)
}
counter.Add(count.Containers, 1)
collection, err := populateCollection[I](
ctx,
qp,
bh,
statusUpdater,
container,
scope,
useLazyReader,
bpc.Options,
counter,
errs)
if err != nil {
return nil, false, clues.Wrap(err, "filling collections")
}
collections := []data.BackupCollection{collection}
metadataPrefix, err := path.BuildMetadata(
qp.TenantID,
qp.ProtectedResource.ID(),
path.TeamsChatsService,
qp.Category,
false)
if err != nil {
return nil, false, clues.WrapWC(ctx, err, "making metadata path prefix").
Label(count.BadPathPrefix)
}
metadataCollection, err := graph.MakeMetadataCollection(
metadataPrefix,
// no deltas or previousPaths are used here; we store empty files instead
[]graph.MetadataCollectionEntry{
graph.NewMetadataEntry(metadata.PreviousPathFileName, map[string]string{}),
graph.NewMetadataEntry(metadata.DeltaURLsFileName, map[string]string{}),
},
statusUpdater,
counter.Local())
if err != nil {
return nil, false, clues.WrapWC(ctx, err, "making metadata collection")
}
collections = append(collections, metadataCollection)
// no deltas involved in this category, so canUsePrevBackups is always true.
return collections, true, nil
}
func populateCollection[I chatsItemer](
ctx context.Context,
qp graph.QueryParams,
bh backupHandler[I],
statusUpdater support.StatusUpdater,
container container[I],
scope selectors.TeamsChatsScope,
useLazyReader bool,
ctrlOpts control.Options,
counter *count.Bus,
errs *fault.Bus,
) (data.BackupCollection, error) {
var (
cl = counter.Local()
collection data.BackupCollection
err error
)
ctx = clues.AddLabelCounter(ctx, cl.PlainAdder())
cc := api.CallConfig{
CanMakeDeltaQueries: false,
}
items, err := bh.getItemIDs(ctx, cc)
if err != nil {
errs.AddRecoverable(ctx, clues.Stack(err))
return collection, clues.Stack(errs.Failure()).OrNil()
}
// Only create a collection if the path matches the scope.
includedItems := []I{}
for _, item := range items {
if !bh.includeItem(item, scope) {
cl.Inc(count.SkippedItems)
} else {
includedItems = append(includedItems, item)
}
}
cl.Add(count.ItemsAdded, int64(len(includedItems)))
cl.Add(count.ItemsRemoved, 0)
p, err := bh.canonicalPath()
if err != nil {
err = clues.StackWC(ctx, err).Label(count.BadCollPath)
errs.AddRecoverable(ctx, err)
return collection, clues.Stack(errs.Failure()).OrNil()
}
collection = NewCollection(
data.NewBaseCollection(
p,
p,
container.humanLocation.Builder(),
ctrlOpts,
false,
cl),
bh,
qp.ProtectedResource.ID(),
includedItems,
container,
statusUpdater,
useLazyReader)
return collection, clues.Stack(errs.Failure()).OrNil()
}

View File

@ -0,0 +1,366 @@
package teamschats
import (
"context"
"testing"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
inMock "github.com/alcionai/corso/src/internal/common/idname/mock"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/teamschats/testdata"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/errs/core"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
selTD "github.com/alcionai/corso/src/pkg/selectors/testdata"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
// ---------------------------------------------------------------------------
// mocks
// ---------------------------------------------------------------------------
var _ backupHandler[models.Chatable] = &mockBackupHandler{}
//lint:ignore U1000 false linter issue due to generics
type mockBackupHandler struct {
chatsErr error
chats []models.Chatable
chatMessagesErr error
chatMessages map[string][]models.ChatMessageable
info map[string]*details.TeamsChatsInfo
getMessageErr map[string]error
doNotInclude bool
}
//lint:ignore U1000 false linter issue due to generics
func (bh mockBackupHandler) augmentItemInfo(
*details.TeamsChatsInfo,
models.Chatable,
) {
// no-op
}
func (bh mockBackupHandler) container() container[models.Chatable] {
return chatContainer()
}
//lint:ignore U1000 required for interface compliance
func (bh mockBackupHandler) getContainer(
context.Context,
api.CallConfig,
) (container[models.Chatable], error) {
return chatContainer(), nil
}
func (bh mockBackupHandler) getItemIDs(
_ context.Context,
_ api.CallConfig,
) ([]models.Chatable, error) {
return bh.chats, bh.chatsErr
}
//lint:ignore U1000 required for interface compliance
func (bh mockBackupHandler) includeItem(
models.Chatable,
selectors.TeamsChatsScope,
) bool {
return !bh.doNotInclude
}
func (bh mockBackupHandler) canonicalPath() (path.Path, error) {
return path.BuildPrefix(
"tenant",
"protectedResource",
path.TeamsChatsService,
path.ChatsCategory)
}
//lint:ignore U1000 false linter issue due to generics
func (bh mockBackupHandler) getItem(
_ context.Context,
_ string,
itemID string,
) (models.Chatable, *details.TeamsChatsInfo, error) {
chat := models.NewChat()
chat.SetId(ptr.To(itemID))
chat.SetTopic(ptr.To(itemID))
chat.SetMessages(bh.chatMessages[itemID])
return chat, bh.info[itemID], bh.getMessageErr[itemID]
}
// ---------------------------------------------------------------------------
// Unit Suite
// ---------------------------------------------------------------------------
type BackupUnitSuite struct {
tester.Suite
creds account.M365Config
}
func TestServiceIteratorsUnitSuite(t *testing.T) {
suite.Run(t, &BackupUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *BackupUnitSuite) SetupSuite() {
a := tconfig.NewFakeM365Account(suite.T())
m365, err := a.M365Config()
require.NoError(suite.T(), err, clues.ToCore(err))
suite.creds = m365
}
func (suite *BackupUnitSuite) TestPopulateCollections() {
var (
qp = graph.QueryParams{
Category: path.ChatsCategory, // doesn't matter which one we use.
ProtectedResource: inMock.NewProvider("user_id", "user_name"),
TenantID: suite.creds.AzureTenantID,
}
statusUpdater = func(*support.ControllerOperationStatus) {}
)
table := []struct {
name string
mock mockBackupHandler
expectErr require.ErrorAssertionFunc
expectColl require.ValueAssertionFunc
}{
{
name: "happy path, one chat",
mock: mockBackupHandler{
chats: testdata.StubChats("one"),
chatMessages: map[string][]models.ChatMessageable{
"one": testdata.StubChatMessages("msg-one"),
},
},
expectErr: require.NoError,
expectColl: require.NotNil,
},
{
name: "happy path, many chats",
mock: mockBackupHandler{
chats: testdata.StubChats("one", "two"),
chatMessages: map[string][]models.ChatMessageable{
"one": testdata.StubChatMessages("msg-one"),
"two": testdata.StubChatMessages("msg-two"),
},
},
expectErr: require.NoError,
expectColl: require.NotNil,
},
{
name: "no chats pass scope",
mock: mockBackupHandler{
chats: testdata.StubChats("one"),
doNotInclude: true,
},
expectErr: require.NoError,
expectColl: require.NotNil,
},
{
name: "no chats",
mock: mockBackupHandler{},
expectErr: require.NoError,
expectColl: require.NotNil,
},
{
name: "no chat messages",
mock: mockBackupHandler{
chats: testdata.StubChats("one"),
},
expectErr: require.NoError,
expectColl: require.NotNil,
},
{
name: "err: deleted in flight",
mock: mockBackupHandler{
chats: testdata.StubChats("one"),
chatsErr: core.ErrNotFound,
},
expectErr: require.Error,
expectColl: require.Nil,
},
{
name: "err enumerating chats",
mock: mockBackupHandler{
chats: testdata.StubChats("one"),
chatsErr: assert.AnError,
},
expectErr: require.Error,
expectColl: require.Nil,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
ctrlOpts := control.Options{FailureHandling: control.FailFast}
result, err := populateCollection(
ctx,
qp,
test.mock,
statusUpdater,
test.mock.container(),
selectors.NewTeamsChatsBackup(nil).Chats(selectors.Any())[0],
false,
ctrlOpts,
count.New(),
fault.New(true))
test.expectErr(t, err, clues.ToCore(err))
test.expectColl(t, result)
if err != nil || result == nil {
return
}
// collection assertions
assert.NotEqual(
t,
result.FullPath().Service(),
path.TeamsChatsMetadataService,
"should not contain metadata collections")
assert.NotEqual(t, result.State(), data.DeletedState, "no tombstones should be produced")
assert.Equal(t, result.State(), data.NotMovedState)
assert.False(t, result.DoNotMergeItems(), "doNotMergeItems should always be false")
})
}
}
// ---------------------------------------------------------------------------
// Integration tests
// ---------------------------------------------------------------------------
type BackupIntgSuite struct {
tester.Suite
resource string
tenantID string
ac api.Client
}
func TestBackupIntgSuite(t *testing.T) {
suite.Run(t, &BackupIntgSuite{
Suite: tester.NewIntegrationSuite(
t,
[][]string{tconfig.M365AcctCredEnvs}),
})
}
func (suite *BackupIntgSuite) SetupSuite() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
graph.InitializeConcurrencyLimiter(ctx, true, 4)
suite.resource = tconfig.M365TeamID(t)
acct := tconfig.NewM365Account(t)
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
suite.tenantID = creds.AzureTenantID
}
func (suite *BackupIntgSuite) TestCreateCollections() {
var (
tenant = tconfig.M365TenantID(suite.T())
protectedResource = tconfig.M365TeamID(suite.T())
resources = []string{protectedResource}
handler = NewUsersChatsBackupHandler(tenant, protectedResource, suite.ac.Chats())
)
tests := []struct {
name string
scope selectors.TeamsChatsScope
chatNames map[string]struct{}
}{
{
name: "chat messages",
scope: selTD.TeamsChatsBackupChatScope(selectors.NewTeamsChatsBackup(resources))[0],
chatNames: map[string]struct{}{
selTD.TestChatTopic: {},
},
},
}
for _, test := range tests {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
ctrlOpts := control.DefaultOptions()
sel := selectors.NewTeamsChatsBackup([]string{protectedResource})
sel.Include(selTD.TeamsChatsBackupChatScope(sel))
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: ctrlOpts,
ProtectedResource: inMock.NewProvider(protectedResource, protectedResource),
Selector: sel.Selector,
}
collections, _, err := CreateCollections(
ctx,
bpc,
handler,
suite.tenantID,
test.scope,
func(status *support.ControllerOperationStatus) {},
false,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, collections, "must have at least one collection")
for _, c := range collections {
if c.FullPath().Service() == path.TeamsChatsMetadataService {
continue
}
require.Empty(t, c.FullPath().Folder(false), "all items should be stored at the root")
locp, ok := c.(data.LocationPather)
if ok {
loc := locp.LocationPath().String()
require.Empty(t, loc, "no items should have locations")
}
}
assert.Len(t, collections, 2, "should have the root folder collection and metadata collection")
})
}
}

View File

@ -0,0 +1,95 @@
package teamschats
import (
"context"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var _ backupHandler[models.Chatable] = &usersChatsBackupHandler{}
type usersChatsBackupHandler struct {
ac api.Chats
protectedResourceID string
tenantID string
}
func NewUsersChatsBackupHandler(
tenantID, protectedResourceID string,
ac api.Chats,
) usersChatsBackupHandler {
return usersChatsBackupHandler{
ac: ac,
protectedResourceID: protectedResourceID,
tenantID: tenantID,
}
}
// chats have no containers. Everything is stored at the root.
//
//lint:ignore U1000 required for interface compliance
func (bh usersChatsBackupHandler) getContainer(
ctx context.Context,
_ api.CallConfig,
) (container[models.Chatable], error) {
return chatContainer(), nil
}
//lint:ignore U1000 required for interface compliance
func (bh usersChatsBackupHandler) getItemIDs(
ctx context.Context,
cc api.CallConfig,
) ([]models.Chatable, error) {
return bh.ac.GetChats(
ctx,
bh.protectedResourceID,
cc)
}
//lint:ignore U1000 required for interface compliance
func (bh usersChatsBackupHandler) includeItem(
ch models.Chatable,
scope selectors.TeamsChatsScope,
) bool {
return scope.Matches(selectors.TeamsChatsChat, ptr.Val(ch.GetTopic()))
}
func (bh usersChatsBackupHandler) canonicalPath() (path.Path, error) {
return path.BuildPrefix(
bh.tenantID,
bh.protectedResourceID,
path.TeamsChatsService,
path.ChatsCategory)
}
//lint:ignore U1000 false linter issue due to generics
func (bh usersChatsBackupHandler) getItem(
ctx context.Context,
userID string,
chatID string,
) (models.Chatable, *details.TeamsChatsInfo, error) {
// FIXME: should retrieve and populate all messages in the chat.
return nil, nil, clues.New("not implemented")
}
//lint:ignore U1000 false linter issue due to generics
func (bh usersChatsBackupHandler) augmentItemInfo(
dgi *details.TeamsChatsInfo,
c models.Chatable,
) {
// no-op
}
func chatContainer() container[models.Chatable] {
return container[models.Chatable]{
storageDirFolders: path.Elements{},
humanLocation: path.Elements{},
}
}

View File

@ -0,0 +1,418 @@
package teamschats
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/alcionai/clues"
kjson "github.com/microsoft/kiota-serialization-json-go"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/errs/core"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
var (
_ data.BackupCollection = &prefetchCollection[chatsItemer]{}
_ data.BackupCollection = &lazyFetchCollection[chatsItemer]{}
)
const (
collectionChannelBufferSize = 1000
numberOfRetries = 4
)
// updateStatus is a utility function used to send the status update through
// the channel.
func updateStatus(
ctx context.Context,
statusUpdater support.StatusUpdater,
attempted int,
streamedItems int64,
totalBytes int64,
folderPath string,
err error,
) {
status := support.CreateStatus(
ctx,
support.Backup,
1,
support.CollectionMetrics{
Objects: attempted,
Successes: int(streamedItems),
Bytes: totalBytes,
},
folderPath)
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
statusUpdater(status)
}
// -----------------------------------------------------------------------------
// prefetchCollection
// -----------------------------------------------------------------------------
type prefetchCollection[I chatsItemer] struct {
data.BaseCollection
protectedResource string
stream chan data.Item
contains container[I]
items []I
getAndAugment getItemAndAugmentInfoer[I]
statusUpdater support.StatusUpdater
}
// State of the collection is set as an observation of the current
// and previous paths. If the curr path is nil, the state is assumed
// to be deleted. If the prev path is nil, it is assumed newly created.
// If both are populated, then state is either moved (if they differ),
// or notMoved (if they match).
func NewCollection[I chatsItemer](
baseCol data.BaseCollection,
getAndAugment getItemAndAugmentInfoer[I],
protectedResource string,
items []I,
contains container[I],
statusUpdater support.StatusUpdater,
useLazyReader bool,
) data.BackupCollection {
if useLazyReader {
return &lazyFetchCollection[I]{
BaseCollection: baseCol,
items: items,
contains: contains,
getAndAugment: getAndAugment,
statusUpdater: statusUpdater,
stream: make(chan data.Item, collectionChannelBufferSize),
protectedResource: protectedResource,
}
}
return &prefetchCollection[I]{
BaseCollection: baseCol,
items: items,
contains: contains,
getAndAugment: getAndAugment,
statusUpdater: statusUpdater,
stream: make(chan data.Item, collectionChannelBufferSize),
protectedResource: protectedResource,
}
}
func (col *prefetchCollection[I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
go col.streamItems(ctx, errs)
return col.stream
}
func (col *prefetchCollection[I]) streamItems(ctx context.Context, errs *fault.Bus) {
var (
streamedItems int64
totalBytes int64
wg sync.WaitGroup
progressMessage chan<- struct{}
el = errs.Local()
)
ctx = clues.Add(ctx, "category", col.Category().String())
defer func() {
close(col.stream)
logger.Ctx(ctx).Infow(
"finished stream backup collection items",
"stats", col.Counter.Values())
updateStatus(
ctx,
col.statusUpdater,
len(col.items),
streamedItems,
totalBytes,
col.FullPath().Folder(false),
errs.Failure())
}()
if len(col.items) > 0 {
progressMessage = observe.CollectionProgress(
ctx,
col.Category().HumanString(),
col.LocationPath().Elements())
defer close(progressMessage)
}
semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch)
defer close(semaphoreCh)
// add any new items
for _, item := range col.items {
if el.Failure() != nil {
break
}
wg.Add(1)
semaphoreCh <- struct{}{}
itemID := ptr.Val(item.GetId())
go func(id string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
item, info, err := col.getAndAugment.getItem(
ctx,
col.protectedResource,
id)
if err != nil {
err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
col.getAndAugment.augmentItemInfo(info, col.contains.container)
if err := writer.WriteObjectValue("", item); err != nil {
err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
itemData, err := writer.GetSerializedContent()
if err != nil {
err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
info.ParentPath = col.LocationPath().String()
storeItem, err := data.NewPrefetchedItemWithInfo(
io.NopCloser(bytes.NewReader(itemData)),
id,
details.ItemInfo{TeamsChats: info})
if err != nil {
err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
col.stream <- storeItem
atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, int64(info.Chat.MessageCount))
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
}
col.Counter.Add(count.StreamBytesAdded, int64(info.Chat.MessageCount))
if progressMessage != nil {
progressMessage <- struct{}{}
}
}(itemID)
}
wg.Wait()
}
// -----------------------------------------------------------------------------
// lazyFetchCollection
// -----------------------------------------------------------------------------
type lazyFetchCollection[I chatsItemer] struct {
data.BaseCollection
protectedResource string
stream chan data.Item
contains container[I]
items []I
getAndAugment getItemAndAugmentInfoer[I]
statusUpdater support.StatusUpdater
}
func (col *lazyFetchCollection[I]) Items(
ctx context.Context,
errs *fault.Bus,
) <-chan data.Item {
go col.streamItems(ctx, errs)
return col.stream
}
func (col *lazyFetchCollection[I]) streamItems(ctx context.Context, errs *fault.Bus) {
var (
streamedItems int64
wg sync.WaitGroup
progressMessage chan<- struct{}
el = errs.Local()
)
ctx = clues.Add(ctx, "category", col.Category().String())
defer func() {
close(col.stream)
logger.Ctx(ctx).Infow(
"finished stream backup collection items",
"stats", col.Counter.Values())
updateStatus(
ctx,
col.statusUpdater,
len(col.items),
streamedItems,
0,
col.FullPath().Folder(false),
errs.Failure())
}()
if len(col.items) > 0 {
progressMessage = observe.CollectionProgress(
ctx,
col.Category().HumanString(),
col.LocationPath().Elements())
defer close(progressMessage)
}
semaphoreCh := make(chan struct{}, col.Opts().Parallelism.ItemFetch)
defer close(semaphoreCh)
// add any new items
for _, item := range col.items {
if el.Failure() != nil {
break
}
itemID := ptr.Val(item.GetId())
modTime := ptr.Val(item.GetLastUpdatedDateTime())
wg.Add(1)
semaphoreCh <- struct{}{}
go func(id string, modTime time.Time) {
defer wg.Done()
defer func() { <-semaphoreCh }()
ictx := clues.Add(
ctx,
"item_id", id,
"parent_path", path.LoggableDir(col.LocationPath().String()))
col.stream <- data.NewLazyItemWithInfo(
ictx,
&lazyItemGetter[I]{
modTime: modTime,
getAndAugment: col.getAndAugment,
resourceID: col.protectedResource,
itemID: id,
containerIDs: col.FullPath().Folders(),
contains: col.contains,
parentPath: col.LocationPath().String(),
},
id,
modTime,
col.Counter,
el)
atomic.AddInt64(&streamedItems, 1)
if progressMessage != nil {
progressMessage <- struct{}{}
}
}(itemID, modTime)
}
wg.Wait()
}
type lazyItemGetter[I chatsItemer] struct {
getAndAugment getItemAndAugmentInfoer[I]
resourceID string
itemID string
parentPath string
containerIDs path.Elements
modTime time.Time
contains container[I]
}
func (lig *lazyItemGetter[I]) GetData(
ctx context.Context,
errs *fault.Bus,
) (io.ReadCloser, *details.ItemInfo, bool, error) {
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
item, info, err := lig.getAndAugment.getItem(
ctx,
lig.resourceID,
lig.itemID)
if err != nil {
// For items that were deleted in flight, add the skip label so that
// they don't lead to recoverable failures during backup.
if clues.HasLabel(err, graph.LabelStatus(http.StatusNotFound)) || errors.Is(err, core.ErrNotFound) {
logger.CtxErr(ctx, err).Info("item deleted in flight. skipping")
// Returning delInFlight as true here for correctness, although the caller is going
// to ignore it since we are returning an error.
return nil, nil, true, clues.Wrap(err, "deleted item").Label(graph.LabelsSkippable)
}
err = clues.WrapWC(ctx, err, "getting item data").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
lig.getAndAugment.augmentItemInfo(info, lig.contains.container)
if err := writer.WriteObjectValue("", item); err != nil {
err = clues.WrapWC(ctx, err, "writing item to serializer").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
itemData, err := writer.GetSerializedContent()
if err != nil {
err = clues.WrapWC(ctx, err, "serializing item").Label(fault.LabelForceNoBackupCreation)
errs.AddRecoverable(ctx, err)
return nil, nil, false, err
}
info.ParentPath = lig.parentPath
// Update the mod time to what we already told kopia about. This is required
// for proper details merging.
info.Modified = lig.modTime
return io.NopCloser(bytes.NewReader(itemData)),
&details.ItemInfo{TeamsChats: info},
false,
nil
}

View File

@ -0,0 +1,475 @@
package teamschats
import (
"bytes"
"context"
"io"
"slices"
"testing"
"time"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/teamschats/testdata"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/errs/core"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
type CollectionUnitSuite struct {
tester.Suite
}
func TestCollectionUnitSuite(t *testing.T) {
suite.Run(t, &CollectionUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *CollectionUnitSuite) TestPrefetchedItem_Reader() {
table := []struct {
name string
readData []byte
}{
{
name: "HasData",
readData: []byte("test message"),
},
{
name: "Empty",
readData: []byte{},
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ed, err := data.NewPrefetchedItemWithInfo(
io.NopCloser(bytes.NewReader(test.readData)),
"itemID",
details.ItemInfo{})
require.NoError(t, err, clues.ToCore(err))
r, err := readers.NewVersionedRestoreReader(ed.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
buf := &bytes.Buffer{}
_, err = buf.ReadFrom(r)
assert.NoError(t, err, "reading data: %v", clues.ToCore(err))
assert.Equal(t, test.readData, buf.Bytes(), "read data")
assert.Equal(t, "itemID", ed.ID(), "item ID")
})
}
}
func (suite *CollectionUnitSuite) TestNewCollection_state() {
fooP, err := path.Build("t", "u", path.TeamsChatsService, path.ChatsCategory, false, "foo")
require.NoError(suite.T(), err, clues.ToCore(err))
barP, err := path.Build("t", "u", path.TeamsChatsService, path.ChatsCategory, false, "bar")
require.NoError(suite.T(), err, clues.ToCore(err))
locPB := path.Builder{}.Append("human-readable")
table := []struct {
name string
prev path.Path
curr path.Path
loc *path.Builder
expect data.CollectionState
}{
{
name: "new",
curr: fooP,
loc: locPB,
expect: data.NewState,
},
{
name: "not moved",
prev: fooP,
curr: fooP,
loc: locPB,
expect: data.NotMovedState,
},
{
name: "moved",
prev: fooP,
curr: barP,
loc: locPB,
expect: data.MovedState,
},
{
name: "deleted",
prev: fooP,
expect: data.DeletedState,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
c := NewCollection[models.Chatable](
data.NewBaseCollection(
test.curr,
test.prev,
test.loc,
control.DefaultOptions(),
false,
count.New()),
nil,
"g",
nil,
container[models.Chatable]{},
nil,
false)
assert.Equal(t, test.expect, c.State(), "collection state")
assert.Equal(t, test.curr, c.FullPath(), "full path")
assert.Equal(t, test.prev, c.PreviousPath(), "prev path")
prefetch, ok := c.(*prefetchCollection[models.Chatable])
require.True(t, ok, "collection type")
assert.Equal(t, test.loc, prefetch.LocationPath(), "location path")
})
}
}
type getAndAugmentChat struct {
err error
}
//lint:ignore U1000 false linter issue due to generics
func (m getAndAugmentChat) getItem(
_ context.Context,
_ string,
itemID string,
) (models.Chatable, *details.TeamsChatsInfo, error) {
chat := models.NewChat()
chat.SetId(ptr.To(itemID))
chat.SetTopic(ptr.To(itemID))
return chat, &details.TeamsChatsInfo{}, m.err
}
//lint:ignore U1000 false linter issue due to generics
func (getAndAugmentChat) augmentItemInfo(*details.TeamsChatsInfo, models.Chatable) {
// no-op
}
func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
var (
t = suite.T()
statusUpdater = func(*support.ControllerOperationStatus) {}
)
fullPath, err := path.Build("t", "pr", path.TeamsChatsService, path.ChatsCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
locPath, err := path.Build("t", "pr", path.TeamsChatsService, path.ChatsCategory, false, "fnords", "smarf")
require.NoError(t, err, clues.ToCore(err))
table := []struct {
name string
items []models.Chatable
}{
{
name: "no items",
},
{
name: "items",
items: testdata.StubChats("fisher", "flannigan", "fitzbog"),
},
}
for _, test := range table {
suite.Run(test.name, func() {
var (
t = suite.T()
errs = fault.New(true)
itemCount int
)
ctx, flush := tester.NewContext(t)
defer flush()
col := &prefetchCollection[models.Chatable]{
BaseCollection: data.NewBaseCollection(
fullPath,
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false,
count.New()),
items: test.items,
contains: container[models.Chatable]{},
getAndAugment: getAndAugmentChat{},
stream: make(chan data.Item),
statusUpdater: statusUpdater,
}
go col.streamItems(ctx, errs)
for item := range col.stream {
itemCount++
ok := slices.ContainsFunc(test.items, func(mc models.Chatable) bool {
return ptr.Val(mc.GetId()) == item.ID()
})
require.True(t, ok, "item must be either added or removed: %q", item.ID())
assert.False(t, item.Deleted(), "additions should not be marked as deleted")
}
assert.NoError(t, errs.Failure())
assert.Equal(
t,
len(test.items),
itemCount,
"should see all expected items")
})
}
}
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
var (
t = suite.T()
statusUpdater = func(*support.ControllerOperationStatus) {}
)
fullPath, err := path.BuildPrefix("t", "pr", path.TeamsChatsService, path.ChatsCategory)
require.NoError(t, err, clues.ToCore(err))
locPath, err := path.BuildPrefix("t", "pr", path.TeamsChatsService, path.ChatsCategory)
require.NoError(t, err, clues.ToCore(err))
table := []struct {
name string
items []models.Chatable
expectItemCount int
// Items we want to trigger lazy reader on.
expectReads []string
}{
{
name: "no items",
},
{
name: "items",
items: testdata.StubChats("fisher", "flannigan", "fitzbog"),
expectItemCount: 3,
expectReads: []string{
"fisher",
"flannigan",
"fitzbog",
},
},
}
for _, test := range table {
suite.Run(test.name, func() {
var (
t = suite.T()
errs = fault.New(true)
itemCount int
)
ctx, flush := tester.NewContext(t)
defer flush()
getterAugmenter := &getAndAugmentChat{}
col := &lazyFetchCollection[models.Chatable]{
BaseCollection: data.NewBaseCollection(
fullPath,
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false,
count.New()),
items: test.items,
contains: container[models.Chatable]{},
getAndAugment: getterAugmenter,
stream: make(chan data.Item),
statusUpdater: statusUpdater,
}
for item := range col.Items(ctx, errs) {
itemCount++
ok := slices.ContainsFunc(test.items, func(mc models.Chatable) bool {
return ptr.Val(mc.GetId()) == item.ID()
})
require.True(t, ok, "item must be either added or removed: %q", item.ID())
assert.False(t, item.Deleted(), "additions should not be marked as deleted")
}
assert.NoError(t, errs.Failure())
assert.Equal(
t,
test.expectItemCount,
itemCount,
"should see all expected items")
})
}
}
func (suite *CollectionUnitSuite) TestLazyItem_GetDataErrors() {
var (
parentPath = ""
now = time.Now()
)
table := []struct {
name string
getErr error
expectReadErrType error
}{
{
name: "ReturnsErrorOnGenericGetError",
getErr: assert.AnError,
expectReadErrType: assert.AnError,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
m := getAndAugmentChat{
err: test.getErr,
}
li := data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter[models.Chatable]{
resourceID: "resourceID",
itemID: "itemID",
getAndAugment: &m,
modTime: now,
parentPath: parentPath,
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(t, now, li.ModTime(), "item mod time")
_, err := readers.NewVersionedRestoreReader(li.ToReader())
assert.ErrorIs(t, err, test.expectReadErrType)
// Should get some form of error when trying to get info.
_, err = li.Info()
assert.Error(t, err, "Info()")
})
}
}
func (suite *CollectionUnitSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlight() {
var (
t = suite.T()
parentPath = ""
now = time.Now()
)
ctx, flush := tester.NewContext(t)
defer flush()
m := getAndAugmentChat{
err: core.ErrNotFound,
}
li := data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter[models.Chatable]{
resourceID: "resourceID",
itemID: "itemID",
getAndAugment: &m,
modTime: now,
parentPath: parentPath,
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(
t,
now,
li.ModTime(),
"item mod time")
_, err := readers.NewVersionedRestoreReader(li.ToReader())
assert.ErrorIs(t, err, core.ErrNotFound, "item should be marked deleted in flight")
}
func (suite *CollectionUnitSuite) TestLazyItem() {
var (
t = suite.T()
parentPath = ""
now = time.Now()
)
ctx, flush := tester.NewContext(t)
defer flush()
m := getAndAugmentChat{}
li := data.NewLazyItemWithInfo(
ctx,
&lazyItemGetter[models.Chatable]{
resourceID: "resourceID",
itemID: "itemID",
getAndAugment: &m,
modTime: now,
parentPath: parentPath,
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
assert.Equal(
t,
now,
li.ModTime(),
"item mod time")
r, err := readers.NewVersionedRestoreReader(li.ToReader())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, readers.DefaultSerializationVersion, r.Format().Version)
assert.False(t, r.Format().DelInFlight)
readData, err := io.ReadAll(r)
assert.NoError(t, err, "reading item data: %v", clues.ToCore(err))
assert.NotEmpty(t, readData, "read item data")
info, err := li.Info()
assert.NoError(t, err, "getting item info: %v", clues.ToCore(err))
assert.Empty(t, parentPath)
assert.Equal(t, now, info.Modified())
}

View File

@ -0,0 +1,17 @@
package teamschats
import (
"context"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/store"
)
func DeserializeMetadataFiles(
ctx context.Context,
colls []data.RestoreCollection,
) ([]store.MetadataFile, error) {
return nil, clues.New("no metadata stored for this service/category")
}

View File

@ -0,0 +1,93 @@
package teamschats
import (
"context"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
// itemer standardizes common behavior that can be expected from all
// items within a chats collection backup.
type chatsItemer interface {
serialization.Parsable
graph.GetIDer
graph.GetLastUpdatedDateTimer
}
type backupHandler[I chatsItemer] interface {
getContainerer[I]
getItemAndAugmentInfoer[I]
getItemer[I]
getItemIDser[I]
includeItemer[I]
canonicalPather
}
// gets the container for the resource
// within this handler set, only one container (the root)
// is expected
type getContainerer[I chatsItemer] interface {
getContainer(
ctx context.Context,
cc api.CallConfig,
) (container[I], error)
}
type getItemAndAugmentInfoer[I chatsItemer] interface {
getItemer[I]
augmentItemInfoer[I]
}
type augmentItemInfoer[I chatsItemer] interface {
// augmentItemInfo completes the teamChatsInfo population with any data
// owned by the container and not accessible to the item.
augmentItemInfo(*details.TeamsChatsInfo, I)
}
// gets all item IDs in the container
type getItemIDser[I chatsItemer] interface {
getItemIDs(
ctx context.Context,
cc api.CallConfig,
) ([]I, error)
}
type getItemer[I chatsItemer] interface {
getItem(
ctx context.Context,
protectedResource string,
itemID string,
) (I, *details.TeamsChatsInfo, error)
}
// includeItemer evaluates whether the item is included
// in the provided scope.
type includeItemer[I chatsItemer] interface {
includeItem(
i I,
scope selectors.TeamsChatsScope,
) bool
}
// canonicalPath constructs the service and category specific path for
// the given builder. The tenantID and protectedResourceID are assumed
// to be stored in the handler already.
type canonicalPather interface {
canonicalPath() (path.Path, error)
}
// ---------------------------------------------------------------------------
// Container management
// ---------------------------------------------------------------------------
type container[I chatsItemer] struct {
storageDirFolders path.Elements
humanLocation path.Elements
container I
}

View File

@ -0,0 +1,40 @@
package testdata
import (
"github.com/google/uuid"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
)
func StubChats(ids ...string) []models.Chatable {
sl := make([]models.Chatable, 0, len(ids))
for _, id := range ids {
ch := models.NewChat()
ch.SetTopic(ptr.To(id))
ch.SetId(ptr.To(id))
sl = append(sl, ch)
}
return sl
}
func StubChatMessages(ids ...string) []models.ChatMessageable {
sl := make([]models.ChatMessageable, 0, len(ids))
for _, id := range ids {
cm := models.NewChatMessage()
cm.SetId(ptr.To(uuid.NewString()))
body := models.NewItemBody()
body.SetContent(ptr.To(id))
cm.SetBody(body)
sl = append(sl, cm)
}
return sl
}

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/collection/drive"
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/m365/collection/groups"
"github.com/alcionai/corso/src/internal/m365/collection/teamschats"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/store"
@ -34,6 +35,8 @@ func (ctrl *Controller) DeserializeMetadataFiles(
return drive.DeserializeMetadataFiles(ctx, colls, count.New())
case path.GroupsService, path.GroupsMetadataService:
return groups.DeserializeMetadataFiles(ctx, colls)
case path.TeamsChatsService, path.TeamsChatsMetadataService:
return teamschats.DeserializeMetadataFiles(ctx, colls)
default:
return nil, clues.NewWC(ctx, "unrecognized service").With("service", service)
}

View File

@ -42,6 +42,7 @@ const (
CollectionNotMoved Key = "collection-state-not-moved"
CollectionTombstoned Key = "collection-state-tombstoned"
Collections Key = "collections"
Containers Key = "containers"
DeleteFolderMarker Key = "delete-folder-marker"
DeleteItemMarker Key = "delete-item-marker"
Drives Key = "drives"
@ -66,6 +67,7 @@ const (
Sites Key = "sites"
Lists Key = "lists"
SkippedContainers Key = "skipped-containers"
SkippedItems Key = "skipped-items"
StreamBytesAdded Key = "stream-bytes-added"
StreamDirsAdded Key = "stream-dirs-added"
StreamDirsFound Key = "stream-dirs-found"

View File

@ -4,7 +4,10 @@ import (
"github.com/alcionai/corso/src/pkg/selectors"
)
const TestChannelName = "Test"
const (
TestChannelName = "Test"
TestChatTopic = "Test"
)
// GroupsBackupFolderScope is the standard folder scope that should be used
// in integration backups with groups when interacting with libraries.
@ -24,3 +27,9 @@ func GroupsBackupConversationScope(sel *selectors.GroupsBackup) []selectors.Grou
// there's no way to easily specify a test conversation by name.
return sel.Conversation(selectors.Any())
}
// TeamsChatsBackupChatScope is the standard folder scope that should be used
// in integration backups with teams chats when interacting with chats.
func TeamsChatsBackupChatScope(sel *selectors.TeamsChatsBackup) []selectors.TeamsChatsScope {
return sel.Chats([]string{TestChatTopic})
}

View File

@ -12,6 +12,10 @@ type GetLastModifiedDateTimer interface {
GetLastModifiedDateTime() *time.Time
}
type GetLastUpdatedDateTimer interface {
GetLastUpdatedDateTime() *time.Time
}
type GetAdditionalDataer interface {
GetAdditionalData() map[string]any
}