Compare commits
6 Commits
main
...
conv_kopia
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c5cf93deb3 | ||
|
|
276ace62f0 | ||
|
|
a58559d6c2 | ||
|
|
3bfadd5416 | ||
|
|
f21ef8ccb5 | ||
|
|
74acdfe516 |
@ -464,10 +464,6 @@ func runGroupsDetailsCmdTest(suite *PreparedBackupGroupsE2ESuite, category path.
|
|||||||
|
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
if category == path.ConversationPostsCategory {
|
|
||||||
t.Skip("skipping conversation details test, see issue #4780")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, flush := tester.NewContext(t)
|
ctx, flush := tester.NewContext(t)
|
||||||
ctx = config.SetViper(ctx, suite.dpnd.vpr)
|
ctx = config.SetViper(ctx, suite.dpnd.vpr)
|
||||||
|
|
||||||
|
|||||||
@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
"golang.org/x/exp/maps"
|
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common/pii"
|
"github.com/alcionai/corso/src/internal/common/pii"
|
||||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||||
@ -69,7 +68,7 @@ func CreateCollections[C graph.GetIDer, I groupsItemer](
|
|||||||
|
|
||||||
counter.Add(count.Channels, int64(len(containers)))
|
counter.Add(count.Channels, int64(len(containers)))
|
||||||
|
|
||||||
collections, err := populateCollections(
|
collections, err := populateCollections[C, I](
|
||||||
ctx,
|
ctx,
|
||||||
qp,
|
qp,
|
||||||
bh,
|
bh,
|
||||||
@ -174,7 +173,7 @@ func populateCollections[C graph.GetIDer, I groupsItemer](
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
added := str.SliceToMap(maps.Keys(addAndRem.Added))
|
added := addAndRem.Added
|
||||||
removed := str.SliceToMap(addAndRem.Removed)
|
removed := str.SliceToMap(addAndRem.Removed)
|
||||||
|
|
||||||
cl.Add(count.ItemsAdded, int64(len(added)))
|
cl.Add(count.ItemsAdded, int64(len(added)))
|
||||||
@ -213,6 +212,7 @@ func populateCollections[C graph.GetIDer, I groupsItemer](
|
|||||||
qp.ProtectedResource.ID(),
|
qp.ProtectedResource.ID(),
|
||||||
added,
|
added,
|
||||||
removed,
|
removed,
|
||||||
|
c,
|
||||||
statusUpdater)
|
statusUpdater)
|
||||||
|
|
||||||
collections[c.storageDirFolders.String()] = &edc
|
collections[c.storageDirFolders.String()] = &edc
|
||||||
|
|||||||
@ -39,6 +39,7 @@ import (
|
|||||||
|
|
||||||
var _ backupHandler[models.Channelable, models.ChatMessageable] = &mockBackupHandler{}
|
var _ backupHandler[models.Channelable, models.ChatMessageable] = &mockBackupHandler{}
|
||||||
|
|
||||||
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
type mockBackupHandler struct {
|
type mockBackupHandler struct {
|
||||||
channels []models.Channelable
|
channels []models.Channelable
|
||||||
messageIDs []string
|
messageIDs []string
|
||||||
@ -50,6 +51,14 @@ type mockBackupHandler struct {
|
|||||||
doNotInclude bool
|
doNotInclude bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
|
func (bh mockBackupHandler) augmentItemInfo(
|
||||||
|
*details.GroupsInfo,
|
||||||
|
models.Channelable,
|
||||||
|
) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
func (bh mockBackupHandler) canMakeDeltaQueries() bool {
|
func (bh mockBackupHandler) canMakeDeltaQueries() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -116,7 +125,8 @@ func (bh mockBackupHandler) canonicalPath(
|
|||||||
false)
|
false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bh mockBackupHandler) GetItem(
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
|
func (bh mockBackupHandler) getItem(
|
||||||
_ context.Context,
|
_ context.Context,
|
||||||
_ string,
|
_ string,
|
||||||
_ path.Elements,
|
_ path.Elements,
|
||||||
|
|||||||
@ -95,7 +95,8 @@ func (bh channelsBackupHandler) PathPrefix(tenantID string) (path.Path, error) {
|
|||||||
false)
|
false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bh channelsBackupHandler) GetItem(
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
|
func (bh channelsBackupHandler) getItem(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
groupID string,
|
groupID string,
|
||||||
containerIDs path.Elements,
|
containerIDs path.Elements,
|
||||||
@ -104,6 +105,14 @@ func (bh channelsBackupHandler) GetItem(
|
|||||||
return bh.ac.GetChannelMessage(ctx, groupID, containerIDs[0], messageID)
|
return bh.ac.GetChannelMessage(ctx, groupID, containerIDs[0], messageID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
|
func (bh channelsBackupHandler) augmentItemInfo(
|
||||||
|
dgi *details.GroupsInfo,
|
||||||
|
c models.Channelable,
|
||||||
|
) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
func channelContainer(ch models.Channelable) container[models.Channelable] {
|
func channelContainer(ch models.Channelable) container[models.Channelable] {
|
||||||
return container[models.Channelable]{
|
return container[models.Channelable]{
|
||||||
storageDirFolders: path.Elements{ptr.Val(ch.GetId())},
|
storageDirFolders: path.Elements{ptr.Val(ch.GetId())},
|
||||||
|
|||||||
@ -3,9 +3,11 @@ package groups
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
kjson "github.com/microsoft/kiota-serialization-json-go"
|
kjson "github.com/microsoft/kiota-serialization-json-go"
|
||||||
@ -17,26 +19,30 @@ import (
|
|||||||
"github.com/alcionai/corso/src/pkg/count"
|
"github.com/alcionai/corso/src/pkg/count"
|
||||||
"github.com/alcionai/corso/src/pkg/fault"
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
|
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ data.BackupCollection = &Collection[groupsItemer]{}
|
var _ data.BackupCollection = &Collection[graph.GetIDer, groupsItemer]{}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
collectionChannelBufferSize = 1000
|
collectionChannelBufferSize = 1000
|
||||||
numberOfRetries = 4
|
numberOfRetries = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
type Collection[I groupsItemer] struct {
|
type Collection[C graph.GetIDer, I groupsItemer] struct {
|
||||||
data.BaseCollection
|
data.BaseCollection
|
||||||
protectedResource string
|
protectedResource string
|
||||||
stream chan data.Item
|
stream chan data.Item
|
||||||
|
|
||||||
|
contains container[C]
|
||||||
|
|
||||||
// added is a list of existing item IDs that were added to a container
|
// added is a list of existing item IDs that were added to a container
|
||||||
added map[string]struct{}
|
added map[string]time.Time
|
||||||
// removed is a list of item IDs that were deleted from, or moved out, of a container
|
// removed is a list of item IDs that were deleted from, or moved out, of a container
|
||||||
removed map[string]struct{}
|
removed map[string]struct{}
|
||||||
|
|
||||||
getter getItemer[I]
|
getAndAugment getItemAndAugmentInfoer[C, I]
|
||||||
|
|
||||||
statusUpdater support.StatusUpdater
|
statusUpdater support.StatusUpdater
|
||||||
}
|
}
|
||||||
@ -47,18 +53,20 @@ type Collection[I groupsItemer] struct {
|
|||||||
// to be deleted. If the prev path is nil, it is assumed newly created.
|
// to be deleted. If the prev path is nil, it is assumed newly created.
|
||||||
// If both are populated, then state is either moved (if they differ),
|
// If both are populated, then state is either moved (if they differ),
|
||||||
// or notMoved (if they match).
|
// or notMoved (if they match).
|
||||||
func NewCollection[I groupsItemer](
|
func NewCollection[C graph.GetIDer, I groupsItemer](
|
||||||
baseCol data.BaseCollection,
|
baseCol data.BaseCollection,
|
||||||
getter getItemer[I],
|
getAndAugment getItemAndAugmentInfoer[C, I],
|
||||||
protectedResource string,
|
protectedResource string,
|
||||||
added map[string]struct{},
|
added map[string]time.Time,
|
||||||
removed map[string]struct{},
|
removed map[string]struct{},
|
||||||
|
contains container[C],
|
||||||
statusUpdater support.StatusUpdater,
|
statusUpdater support.StatusUpdater,
|
||||||
) Collection[I] {
|
) Collection[C, I] {
|
||||||
collection := Collection[I]{
|
collection := Collection[C, I]{
|
||||||
BaseCollection: baseCol,
|
BaseCollection: baseCol,
|
||||||
added: added,
|
added: added,
|
||||||
getter: getter,
|
contains: contains,
|
||||||
|
getAndAugment: getAndAugment,
|
||||||
removed: removed,
|
removed: removed,
|
||||||
statusUpdater: statusUpdater,
|
statusUpdater: statusUpdater,
|
||||||
stream: make(chan data.Item, collectionChannelBufferSize),
|
stream: make(chan data.Item, collectionChannelBufferSize),
|
||||||
@ -70,7 +78,7 @@ func NewCollection[I groupsItemer](
|
|||||||
|
|
||||||
// Items utility function to asynchronously execute process to fill data channel with
|
// Items utility function to asynchronously execute process to fill data channel with
|
||||||
// M365 exchange objects and returns the data channel
|
// M365 exchange objects and returns the data channel
|
||||||
func (col *Collection[I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
|
func (col *Collection[C, I]) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
|
||||||
go col.streamItems(ctx, errs)
|
go col.streamItems(ctx, errs)
|
||||||
return col.stream
|
return col.stream
|
||||||
}
|
}
|
||||||
@ -79,7 +87,7 @@ func (col *Collection[I]) Items(ctx context.Context, errs *fault.Bus) <-chan dat
|
|||||||
// items() production
|
// items() production
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
func (col *Collection[I]) streamItems(ctx context.Context, errs *fault.Bus) {
|
func (col *Collection[C, I]) streamItems(ctx context.Context, errs *fault.Bus) {
|
||||||
var (
|
var (
|
||||||
streamedItems int64
|
streamedItems int64
|
||||||
totalBytes int64
|
totalBytes int64
|
||||||
@ -130,84 +138,102 @@ func (col *Collection[I]) streamItems(ctx context.Context, errs *fault.Bus) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// add any new items
|
// add any new items
|
||||||
for id := range col.added {
|
for id, modTime := range col.added {
|
||||||
if el.Failure() != nil {
|
if el.Failure() != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
col.stream <- data.NewLazyItemWithInfo(
|
||||||
semaphoreCh <- struct{}{}
|
ctx,
|
||||||
|
&lazyItemGetter[C, I]{
|
||||||
|
modTime: modTime,
|
||||||
|
getAndAugment: col.getAndAugment,
|
||||||
|
userID: col.protectedResource,
|
||||||
|
itemID: id,
|
||||||
|
containerIDs: col.FullPath().Folders(),
|
||||||
|
contains: col.contains,
|
||||||
|
parentPath: col.LocationPath().String(),
|
||||||
|
},
|
||||||
|
id,
|
||||||
|
modTime,
|
||||||
|
col.Counter,
|
||||||
|
el)
|
||||||
|
|
||||||
go func(id string) {
|
// wg.Add(1)
|
||||||
defer wg.Done()
|
// semaphoreCh <- struct{}{}
|
||||||
defer func() { <-semaphoreCh }()
|
|
||||||
|
|
||||||
writer := kjson.NewJsonSerializationWriter()
|
// go func(id string) {
|
||||||
defer writer.Close()
|
// defer wg.Done()
|
||||||
|
// defer func() { <-semaphoreCh }()
|
||||||
|
|
||||||
item, info, err := col.getter.GetItem(
|
// writer := kjson.NewJsonSerializationWriter()
|
||||||
ctx,
|
// defer writer.Close()
|
||||||
col.protectedResource,
|
|
||||||
col.FullPath().Folders(),
|
|
||||||
id)
|
|
||||||
if err != nil {
|
|
||||||
err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
|
|
||||||
el.AddRecoverable(ctx, err)
|
|
||||||
|
|
||||||
return
|
// item, info, err := col.getAndAugment.getItem(
|
||||||
}
|
// ctx,
|
||||||
|
// col.protectedResource,
|
||||||
|
// col.FullPath().Folders(),
|
||||||
|
// id)
|
||||||
|
// if err != nil {
|
||||||
|
// err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
|
||||||
|
// el.AddRecoverable(ctx, err)
|
||||||
|
|
||||||
if err := writer.WriteObjectValue("", item); err != nil {
|
// return
|
||||||
err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
|
// }
|
||||||
el.AddRecoverable(ctx, err)
|
|
||||||
|
|
||||||
return
|
// col.getAndAugment.augmentItemInfo(info, col.contains.container)
|
||||||
}
|
|
||||||
|
|
||||||
itemData, err := writer.GetSerializedContent()
|
// if err := writer.WriteObjectValue("", item); err != nil {
|
||||||
if err != nil {
|
// err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
|
||||||
err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
|
// el.AddRecoverable(ctx, err)
|
||||||
el.AddRecoverable(ctx, err)
|
|
||||||
|
|
||||||
return
|
// return
|
||||||
}
|
// }
|
||||||
|
|
||||||
info.ParentPath = col.LocationPath().String()
|
// itemData, err := writer.GetSerializedContent()
|
||||||
|
// if err != nil {
|
||||||
|
// err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
|
||||||
|
// el.AddRecoverable(ctx, err)
|
||||||
|
|
||||||
storeItem, err := data.NewPrefetchedItemWithInfo(
|
// return
|
||||||
io.NopCloser(bytes.NewReader(itemData)),
|
// }
|
||||||
id,
|
|
||||||
details.ItemInfo{Groups: info})
|
|
||||||
if err != nil {
|
|
||||||
err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
|
|
||||||
el.AddRecoverable(ctx, err)
|
|
||||||
|
|
||||||
return
|
// info.ParentPath = col.LocationPath().String()
|
||||||
}
|
|
||||||
|
|
||||||
col.stream <- storeItem
|
// storeItem, err := data.NewPrefetchedItemWithInfo(
|
||||||
|
// io.NopCloser(bytes.NewReader(itemData)),
|
||||||
|
// id,
|
||||||
|
// details.ItemInfo{Groups: info})
|
||||||
|
// if err != nil {
|
||||||
|
// err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
|
||||||
|
// el.AddRecoverable(ctx, err)
|
||||||
|
|
||||||
atomic.AddInt64(&streamedItems, 1)
|
// return
|
||||||
atomic.AddInt64(&totalBytes, info.Size)
|
// }
|
||||||
|
|
||||||
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
|
// col.stream <- storeItem
|
||||||
logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
|
|
||||||
}
|
|
||||||
|
|
||||||
col.Counter.Add(count.StreamBytesAdded, info.Size)
|
// atomic.AddInt64(&streamedItems, 1)
|
||||||
|
// atomic.AddInt64(&totalBytes, info.Size)
|
||||||
|
|
||||||
if colProgress != nil {
|
// if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
|
||||||
colProgress <- struct{}{}
|
// logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
|
||||||
}
|
// }
|
||||||
}(id)
|
|
||||||
|
// col.Counter.Add(count.StreamBytesAdded, info.Size)
|
||||||
|
|
||||||
|
if colProgress != nil {
|
||||||
|
colProgress <- struct{}{}
|
||||||
|
}
|
||||||
|
// }(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
// wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// finishPopulation is a utility function used to close a Collection's data channel
|
// finishPopulation is a utility function used to close a Collection's data channel
|
||||||
// and to send the status update through the channel.
|
// and to send the status update through the channel.
|
||||||
func (col *Collection[I]) finishPopulation(
|
func (col *Collection[C, I]) finishPopulation(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
streamedItems, totalBytes int64,
|
streamedItems, totalBytes int64,
|
||||||
err error,
|
err error,
|
||||||
@ -230,3 +256,72 @@ func (col *Collection[I]) finishPopulation(
|
|||||||
|
|
||||||
col.statusUpdater(status)
|
col.statusUpdater(status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type lazyItemGetter[C graph.GetIDer, I groupsItemer] struct {
|
||||||
|
getAndAugment getItemAndAugmentInfoer[C, I]
|
||||||
|
userID string
|
||||||
|
itemID string
|
||||||
|
containerIDs path.Elements
|
||||||
|
parentPath string
|
||||||
|
modTime time.Time
|
||||||
|
contains container[C]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lig *lazyItemGetter[C, I]) GetData(
|
||||||
|
ctx context.Context,
|
||||||
|
errs *fault.Bus,
|
||||||
|
) (io.ReadCloser, *details.ItemInfo, bool, error) {
|
||||||
|
writer := kjson.NewJsonSerializationWriter()
|
||||||
|
defer writer.Close()
|
||||||
|
|
||||||
|
fmt.Println("getdata itemID: ", lig.itemID)
|
||||||
|
item, info, err := lig.getAndAugment.getItem(
|
||||||
|
ctx,
|
||||||
|
lig.userID,
|
||||||
|
lig.containerIDs,
|
||||||
|
lig.itemID)
|
||||||
|
if err != nil {
|
||||||
|
// If an item was deleted then return an empty file so we don't fail
|
||||||
|
// the backup and return a sentinel error when asked for ItemInfo so
|
||||||
|
// we don't display the item in the backup.
|
||||||
|
//
|
||||||
|
// The item will be deleted from kopia on the next backup when the
|
||||||
|
// delta token shows it's removed.
|
||||||
|
if graph.IsErrDeletedInFlight(err) {
|
||||||
|
logger.CtxErr(ctx, err).Info("item not found")
|
||||||
|
return nil, nil, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
|
||||||
|
errs.AddRecoverable(ctx, err)
|
||||||
|
|
||||||
|
return nil, nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lig.getAndAugment.augmentItemInfo(info, lig.contains.container)
|
||||||
|
|
||||||
|
if err := writer.WriteObjectValue("", item); err != nil {
|
||||||
|
err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
|
||||||
|
errs.AddRecoverable(ctx, err)
|
||||||
|
|
||||||
|
return nil, nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
itemData, err := writer.GetSerializedContent()
|
||||||
|
if err != nil {
|
||||||
|
err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
|
||||||
|
errs.AddRecoverable(ctx, err)
|
||||||
|
|
||||||
|
return nil, nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
info.ParentPath = lig.parentPath
|
||||||
|
// Update the mod time to what we already told kopia about. This is required
|
||||||
|
// for proper details merging.
|
||||||
|
info.Modified = lig.modTime
|
||||||
|
|
||||||
|
return io.NopCloser(bytes.NewReader(itemData)),
|
||||||
|
&details.ItemInfo{Groups: info},
|
||||||
|
false,
|
||||||
|
nil
|
||||||
|
}
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package groups
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -12,9 +13,9 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||||
"github.com/alcionai/corso/src/internal/common/readers"
|
"github.com/alcionai/corso/src/internal/common/readers"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
"github.com/alcionai/corso/src/internal/m365/collection/groups/mock"
|
|
||||||
"github.com/alcionai/corso/src/internal/m365/support"
|
"github.com/alcionai/corso/src/internal/m365/support"
|
||||||
"github.com/alcionai/corso/src/internal/tester"
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
@ -117,7 +118,7 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
|
|||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
c := NewCollection[models.ChatMessageable](
|
c := NewCollection[models.Channelable, models.ChatMessageable](
|
||||||
data.NewBaseCollection(
|
data.NewBaseCollection(
|
||||||
test.curr,
|
test.curr,
|
||||||
test.prev,
|
test.prev,
|
||||||
@ -127,7 +128,9 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
|
|||||||
count.New()),
|
count.New()),
|
||||||
nil,
|
nil,
|
||||||
"g",
|
"g",
|
||||||
nil, nil,
|
nil,
|
||||||
|
nil,
|
||||||
|
container[models.Channelable]{},
|
||||||
nil)
|
nil)
|
||||||
assert.Equal(t, test.expect, c.State(), "collection state")
|
assert.Equal(t, test.expect, c.State(), "collection state")
|
||||||
assert.Equal(t, test.curr, c.FullPath(), "full path")
|
assert.Equal(t, test.curr, c.FullPath(), "full path")
|
||||||
@ -137,6 +140,28 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type getAndAugmentChannelMessage struct {
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
|
func (m getAndAugmentChannelMessage) getItem(
|
||||||
|
_ context.Context,
|
||||||
|
_ string,
|
||||||
|
_ path.Elements,
|
||||||
|
itemID string,
|
||||||
|
) (models.ChatMessageable, *details.GroupsInfo, error) {
|
||||||
|
msg := models.NewChatMessage()
|
||||||
|
msg.SetId(ptr.To(itemID))
|
||||||
|
|
||||||
|
return msg, &details.GroupsInfo{}, m.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
|
func (getAndAugmentChannelMessage) augmentItemInfo(*details.GroupsInfo, models.Channelable) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *CollectionUnitSuite) TestCollection_streamItems() {
|
func (suite *CollectionUnitSuite) TestCollection_streamItems() {
|
||||||
var (
|
var (
|
||||||
t = suite.T()
|
t = suite.T()
|
||||||
@ -199,7 +224,7 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() {
|
|||||||
ctx, flush := tester.NewContext(t)
|
ctx, flush := tester.NewContext(t)
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
col := &Collection[models.ChatMessageable]{
|
col := &Collection[models.Channelable, models.ChatMessageable]{
|
||||||
BaseCollection: data.NewBaseCollection(
|
BaseCollection: data.NewBaseCollection(
|
||||||
fullPath,
|
fullPath,
|
||||||
nil,
|
nil,
|
||||||
@ -208,8 +233,9 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() {
|
|||||||
false,
|
false,
|
||||||
count.New()),
|
count.New()),
|
||||||
added: test.added,
|
added: test.added,
|
||||||
|
contains: container[models.Channelable]{},
|
||||||
removed: test.removed,
|
removed: test.removed,
|
||||||
getter: mock.GetChannelMessage{},
|
getAndAugment: getAndAugmentChannelMessage{},
|
||||||
stream: make(chan data.Item),
|
stream: make(chan data.Item),
|
||||||
statusUpdater: statusUpdater,
|
statusUpdater: statusUpdater,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -113,7 +113,8 @@ func (bh conversationsBackupHandler) PathPrefix(tenantID string) (path.Path, err
|
|||||||
false)
|
false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bh conversationsBackupHandler) GetItem(
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
|
func (bh conversationsBackupHandler) getItem(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
groupID string,
|
groupID string,
|
||||||
containerIDs path.Elements, // expects: [conversationID, threadID]
|
containerIDs path.Elements, // expects: [conversationID, threadID]
|
||||||
@ -128,6 +129,14 @@ func (bh conversationsBackupHandler) GetItem(
|
|||||||
api.CallConfig{})
|
api.CallConfig{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//lint:ignore U1000 false linter issue due to generics
|
||||||
|
func (bh conversationsBackupHandler) augmentItemInfo(
|
||||||
|
dgi *details.GroupsInfo,
|
||||||
|
c models.Conversationable,
|
||||||
|
) {
|
||||||
|
dgi.Post.Topic = ptr.Val(c.GetTopic())
|
||||||
|
}
|
||||||
|
|
||||||
func conversationThreadContainer(
|
func conversationThreadContainer(
|
||||||
c models.Conversationable,
|
c models.Conversationable,
|
||||||
t models.ConversationThreadable,
|
t models.ConversationThreadable,
|
||||||
@ -136,9 +145,8 @@ func conversationThreadContainer(
|
|||||||
storageDirFolders: path.Elements{ptr.Val(c.GetId()), ptr.Val(t.GetId())},
|
storageDirFolders: path.Elements{ptr.Val(c.GetId()), ptr.Val(t.GetId())},
|
||||||
// microsoft UX doesn't display any sort of container name that would make a reasonable
|
// microsoft UX doesn't display any sort of container name that would make a reasonable
|
||||||
// "location" for the posts in the conversation. We may need to revisit this, perhaps
|
// "location" for the posts in the conversation. We may need to revisit this, perhaps
|
||||||
// the subject is sufficiently acceptable. But at this time it's left empty so that
|
// the subject (aka topic) is sufficiently acceptable.
|
||||||
// we don't populate it with problematic data.
|
humanLocation: path.Elements{ptr.Val(c.GetTopic())},
|
||||||
humanLocation: path.Elements{},
|
|
||||||
canMakeDeltaQueries: false,
|
canMakeDeltaQueries: false,
|
||||||
container: c,
|
container: c,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,13 +25,25 @@ type backupHandler[C graph.GetIDer, I groupsItemer] interface {
|
|||||||
getItemer[I]
|
getItemer[I]
|
||||||
getContainerser[C]
|
getContainerser[C]
|
||||||
getContainerItemIDser
|
getContainerItemIDser
|
||||||
|
getItemAndAugmentInfoer[C, I]
|
||||||
includeContainerer[C]
|
includeContainerer[C]
|
||||||
canonicalPather
|
canonicalPather
|
||||||
canMakeDeltaQuerieser
|
canMakeDeltaQuerieser
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type getItemAndAugmentInfoer[C graph.GetIDer, I groupsItemer] interface {
|
||||||
|
getItemer[I]
|
||||||
|
augmentItemInfoer[C]
|
||||||
|
}
|
||||||
|
|
||||||
|
type augmentItemInfoer[C graph.GetIDer] interface {
|
||||||
|
// augmentItemInfo completes the groupInfo population with any data
|
||||||
|
// owned by the container and not accessible to the item.
|
||||||
|
augmentItemInfo(*details.GroupsInfo, C)
|
||||||
|
}
|
||||||
|
|
||||||
type getItemer[I groupsItemer] interface {
|
type getItemer[I groupsItemer] interface {
|
||||||
GetItem(
|
getItem(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
protectedResource string,
|
protectedResource string,
|
||||||
containerIDs path.Elements,
|
containerIDs path.Elements,
|
||||||
|
|||||||
@ -1,27 +0,0 @@
|
|||||||
package mock
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
|
||||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
|
||||||
)
|
|
||||||
|
|
||||||
type GetChannelMessage struct {
|
|
||||||
Err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m GetChannelMessage) GetItem(
|
|
||||||
_ context.Context,
|
|
||||||
_ string,
|
|
||||||
_ path.Elements,
|
|
||||||
itemID string,
|
|
||||||
) (models.ChatMessageable, *details.GroupsInfo, error) {
|
|
||||||
msg := models.NewChatMessage()
|
|
||||||
msg.SetId(ptr.To(itemID))
|
|
||||||
|
|
||||||
return msg, &details.GroupsInfo{}, m.Err
|
|
||||||
}
|
|
||||||
@ -65,7 +65,7 @@ type ConversationPostInfo struct {
|
|||||||
Creator string `json:"creator,omitempty"`
|
Creator string `json:"creator,omitempty"`
|
||||||
Preview string `json:"preview,omitempty"`
|
Preview string `json:"preview,omitempty"`
|
||||||
Size int64 `json:"size,omitempty"`
|
Size int64 `json:"size,omitempty"`
|
||||||
Subject string `json:"subject,omitempty"`
|
Topic string `json:"topic,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChannelMessageInfo struct {
|
type ChannelMessageInfo struct {
|
||||||
@ -86,6 +86,8 @@ func (i GroupsInfo) Headers() []string {
|
|||||||
return []string{"ItemName", "Library", "ParentPath", "Size", "Owner", "Created", "Modified"}
|
return []string{"ItemName", "Library", "ParentPath", "Size", "Owner", "Created", "Modified"}
|
||||||
case GroupsChannelMessage:
|
case GroupsChannelMessage:
|
||||||
return []string{"Message", "Channel", "Subject", "Replies", "Creator", "Created", "Last Reply"}
|
return []string{"Message", "Channel", "Subject", "Replies", "Creator", "Created", "Last Reply"}
|
||||||
|
case GroupsConversationPost:
|
||||||
|
return []string{"Post", "Conversation", "Sender", "Created"}
|
||||||
}
|
}
|
||||||
|
|
||||||
return []string{}
|
return []string{}
|
||||||
@ -112,7 +114,7 @@ func (i GroupsInfo) Values() []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return []string{
|
return []string{
|
||||||
// html parsing may produce newlijnes, which we'll want to avoid
|
// html parsing may produce newlines, which we'll want to avoid
|
||||||
strings.ReplaceAll(i.Message.Preview, "\n", "\\n"),
|
strings.ReplaceAll(i.Message.Preview, "\n", "\\n"),
|
||||||
i.ParentPath,
|
i.ParentPath,
|
||||||
i.Message.Subject,
|
i.Message.Subject,
|
||||||
@ -121,6 +123,14 @@ func (i GroupsInfo) Values() []string {
|
|||||||
dttm.FormatToTabularDisplay(i.Message.CreatedAt),
|
dttm.FormatToTabularDisplay(i.Message.CreatedAt),
|
||||||
lastReply,
|
lastReply,
|
||||||
}
|
}
|
||||||
|
case GroupsConversationPost:
|
||||||
|
return []string{
|
||||||
|
// html parsing may produce newlines, which we'll want to avoid
|
||||||
|
strings.ReplaceAll(i.Post.Preview, "\n", "\\n"),
|
||||||
|
i.Post.Topic,
|
||||||
|
i.Post.Creator,
|
||||||
|
dttm.FormatToTabularDisplay(i.Post.CreatedAt),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return []string{}
|
return []string{}
|
||||||
|
|||||||
@ -21,38 +21,95 @@ func TestGroupsUnitSuite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (suite *GroupsUnitSuite) TestGroupsPrintable() {
|
func (suite *GroupsUnitSuite) TestGroupsPrintable() {
|
||||||
t := suite.T()
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
then := now.Add(time.Minute)
|
then := now.Add(time.Minute)
|
||||||
|
|
||||||
gi := details.GroupsInfo{
|
table := []struct {
|
||||||
ItemType: details.GroupsChannelMessage,
|
name string
|
||||||
ParentPath: "parentPath",
|
info details.GroupsInfo
|
||||||
Message: details.ChannelMessageInfo{
|
expectHs []string
|
||||||
Preview: "preview",
|
expectVs []string
|
||||||
ReplyCount: 1,
|
}{
|
||||||
Creator: "creator",
|
{
|
||||||
CreatedAt: now,
|
name: "channel message",
|
||||||
Subject: "subject",
|
info: details.GroupsInfo{
|
||||||
|
ItemType: details.GroupsChannelMessage,
|
||||||
|
ParentPath: "parentpath",
|
||||||
|
Message: details.ChannelMessageInfo{
|
||||||
|
Preview: "preview",
|
||||||
|
ReplyCount: 1,
|
||||||
|
Creator: "creator",
|
||||||
|
CreatedAt: now,
|
||||||
|
Subject: "subject",
|
||||||
|
},
|
||||||
|
LastReply: details.ChannelMessageInfo{
|
||||||
|
CreatedAt: then,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectHs: []string{"Message", "Channel", "Subject", "Replies", "Creator", "Created", "Last Reply"},
|
||||||
|
expectVs: []string{
|
||||||
|
"preview",
|
||||||
|
"parentpath",
|
||||||
|
"subject",
|
||||||
|
"1",
|
||||||
|
"creator",
|
||||||
|
dttm.FormatToTabularDisplay(now),
|
||||||
|
dttm.FormatToTabularDisplay(then),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
LastReply: details.ChannelMessageInfo{
|
{
|
||||||
CreatedAt: then,
|
name: "conversation post",
|
||||||
|
info: details.GroupsInfo{
|
||||||
|
ItemType: details.GroupsConversationPost,
|
||||||
|
Post: details.ConversationPostInfo{
|
||||||
|
Preview: "preview",
|
||||||
|
Creator: "creator",
|
||||||
|
CreatedAt: now,
|
||||||
|
Topic: "topic",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectHs: []string{"Post", "Conversation", "Sender", "Created"},
|
||||||
|
expectVs: []string{
|
||||||
|
"preview",
|
||||||
|
"topic",
|
||||||
|
"creator",
|
||||||
|
dttm.FormatToTabularDisplay(now),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "sharepoint library",
|
||||||
|
info: details.GroupsInfo{
|
||||||
|
ItemType: details.SharePointLibrary,
|
||||||
|
ParentPath: "parentPath",
|
||||||
|
Created: now,
|
||||||
|
Modified: then,
|
||||||
|
DriveName: "librarydrive",
|
||||||
|
ItemName: "item",
|
||||||
|
Size: 42,
|
||||||
|
Owner: "user",
|
||||||
|
},
|
||||||
|
expectHs: []string{"ItemName", "Library", "ParentPath", "Size", "Owner", "Created", "Modified"},
|
||||||
|
expectVs: []string{
|
||||||
|
"item",
|
||||||
|
"librarydrive",
|
||||||
|
"parentPath",
|
||||||
|
"42 B",
|
||||||
|
"user",
|
||||||
|
dttm.FormatToTabularDisplay(now),
|
||||||
|
dttm.FormatToTabularDisplay(then),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
suite.Run(test.name, func() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
expectVs := []string{
|
hs := test.info.Headers()
|
||||||
"preview",
|
vs := test.info.Values()
|
||||||
"parentPath",
|
|
||||||
"subject",
|
assert.Equal(t, len(hs), len(vs))
|
||||||
"1",
|
assert.Equal(t, test.expectHs, hs)
|
||||||
"creator",
|
assert.Equal(t, test.expectVs, vs)
|
||||||
dttm.FormatToTabularDisplay(now),
|
})
|
||||||
dttm.FormatToTabularDisplay(then),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
hs := gi.Headers()
|
|
||||||
vs := gi.Values()
|
|
||||||
|
|
||||||
assert.Equal(t, len(hs), len(vs))
|
|
||||||
assert.Equal(t, expectVs, vs)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -217,9 +217,8 @@ func (s *groups) AllData() []GroupsScope {
|
|||||||
scopes = append(
|
scopes = append(
|
||||||
scopes,
|
scopes,
|
||||||
makeScope[GroupsScope](GroupsLibraryFolder, Any()),
|
makeScope[GroupsScope](GroupsLibraryFolder, Any()),
|
||||||
makeScope[GroupsScope](GroupsChannel, Any()))
|
makeScope[GroupsScope](GroupsChannel, Any()),
|
||||||
// TODO: enable conversations in all-data backups
|
makeScope[GroupsScope](GroupsConversation, Any()))
|
||||||
// makeScope[GroupsScope](GroupsConversation, Any()))
|
|
||||||
|
|
||||||
return scopes
|
return scopes
|
||||||
}
|
}
|
||||||
|
|||||||
@ -96,7 +96,7 @@ func (suite *ConversationsAPIUnitSuite) TestConversationPostInfo() {
|
|||||||
Preview: "",
|
Preview: "",
|
||||||
Size: 0,
|
Size: 0,
|
||||||
// TODO: feed the subject in from the conversation
|
// TODO: feed the subject in from the conversation
|
||||||
Subject: "",
|
Topic: "",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user