Add metadata files for posts
This commit is contained in:
parent
b073c204c8
commit
777e6fba34
@ -2,6 +2,7 @@ package groups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -136,6 +137,15 @@ func (bh mockBackupHandler) getItem(
|
||||
return bh.messages[itemID], bh.info[itemID], bh.getMessageErr[itemID]
|
||||
}
|
||||
|
||||
func (bh mockBackupHandler) getItemMetadata(
|
||||
_ context.Context,
|
||||
_ models.Channelable,
|
||||
_ string,
|
||||
_ time.Time,
|
||||
) (io.ReadCloser, int, error) {
|
||||
return nil, 0, ErrMetadataFilesNotSupported
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Unit Suite
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -2,6 +2,8 @@ package groups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -14,6 +16,8 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/pagers"
|
||||
)
|
||||
|
||||
var ErrMetadataFilesNotSupported = clues.New("metadata files not supported")
|
||||
|
||||
var _ backupHandler[models.Channelable, models.ChatMessageable] = &channelsBackupHandler{}
|
||||
|
||||
type channelsBackupHandler struct {
|
||||
@ -105,6 +109,19 @@ func (bh channelsBackupHandler) getItem(
|
||||
return bh.ac.GetChannelMessage(ctx, groupID, containerIDs[0], messageID)
|
||||
}
|
||||
|
||||
// Channel messages don't carry metadata files. Return unsupported error.
|
||||
// Adding this method for interface compliance.
|
||||
//
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh channelsBackupHandler) getItemMetadata(
|
||||
_ context.Context,
|
||||
_ models.Channelable,
|
||||
_ string,
|
||||
_ time.Time,
|
||||
) (io.ReadCloser, int, error) {
|
||||
return nil, 0, ErrMetadataFilesNotSupported
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh channelsBackupHandler) augmentItemInfo(
|
||||
dgi *details.GroupsInfo,
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
kjson "github.com/microsoft/kiota-serialization-json-go"
|
||||
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/m365/support"
|
||||
@ -33,6 +34,9 @@ var (
|
||||
const (
|
||||
collectionChannelBufferSize = 1000
|
||||
numberOfRetries = 4
|
||||
|
||||
metaFileSuffix = ".meta"
|
||||
dataFileSuffix = ".data"
|
||||
)
|
||||
|
||||
// updateStatus is a utility function used to send the status update through
|
||||
@ -370,6 +374,46 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
|
||||
"item_id", id,
|
||||
"parent_path", path.LoggableDir(col.LocationPath().String()))
|
||||
|
||||
// Handle metadata file first before adding data file.
|
||||
// TODO(pandeyabs): Mention why.
|
||||
itemMeta, itemMetaSize, err := col.getAndAugment.getItemMetadata(
|
||||
ictx,
|
||||
col.contains.container,
|
||||
id,
|
||||
modTime)
|
||||
if err != nil && !errors.Is(err, ErrMetadataFilesNotSupported) {
|
||||
err = clues.StackWC(ictx, err).Label(fault.LabelForceNoBackupCreation)
|
||||
el.AddRecoverable(ictx, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||
progReader := observe.ItemProgress(
|
||||
ctx,
|
||||
itemMeta,
|
||||
observe.ItemBackupMsg,
|
||||
clues.Hide(id+metaFileSuffix),
|
||||
int64(itemMetaSize))
|
||||
return progReader, nil
|
||||
})
|
||||
|
||||
storeItem, err := data.NewPrefetchedItem(
|
||||
metaReader,
|
||||
id+metaFileSuffix,
|
||||
// Use same modTime as post's.
|
||||
modTime)
|
||||
if err != nil {
|
||||
errs.AddRecoverable(ctx, clues.StackWC(ctx, err).
|
||||
Label(fault.LabelForceNoBackupCreation))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
col.stream <- storeItem
|
||||
}
|
||||
|
||||
col.stream <- data.NewLazyItemWithInfo(
|
||||
ictx,
|
||||
&lazyItemGetter[C, I]{
|
||||
@ -381,7 +425,11 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
|
||||
contains: col.contains,
|
||||
parentPath: col.LocationPath().String(),
|
||||
},
|
||||
id,
|
||||
// TODO(pandeyabs): Persist as .data file for conversations only.
|
||||
// Not for channels, i.e. only add the .data suffix for conv backups.
|
||||
// This is safe for now since channels don't utilize lazy fetch collection
|
||||
// yet.
|
||||
id+dataFileSuffix,
|
||||
modTime,
|
||||
col.Counter,
|
||||
el)
|
||||
|
||||
@ -170,6 +170,16 @@ func (getAndAugmentChannelMessage) augmentItemInfo(*details.GroupsInfo, models.C
|
||||
// no-op
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (getAndAugmentChannelMessage) getItemMetadata(
|
||||
_ context.Context,
|
||||
_ models.Channelable,
|
||||
_ string,
|
||||
_ time.Time,
|
||||
) (io.ReadCloser, int, error) {
|
||||
return nil, 0, ErrMetadataFilesNotSupported
|
||||
}
|
||||
|
||||
func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
|
||||
var (
|
||||
t = suite.T()
|
||||
@ -297,6 +307,16 @@ func (m *getAndAugmentConversation) getItem(
|
||||
return p, &details.GroupsInfo{}, m.GetItemErr
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (m *getAndAugmentConversation) getItemMetadata(
|
||||
_ context.Context,
|
||||
_ models.Conversationable,
|
||||
_ string,
|
||||
_ time.Time,
|
||||
) (io.ReadCloser, int, error) {
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
//
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (m *getAndAugmentConversation) augmentItemInfo(*details.GroupsInfo, models.Conversationable) {
|
||||
|
||||
@ -1,7 +1,11 @@
|
||||
package groups
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -132,6 +136,42 @@ func (bh conversationsBackupHandler) getItem(
|
||||
postID)
|
||||
}
|
||||
|
||||
// ConversationPostMetadata contains metadata about a conversation post.
|
||||
// It gets stored in a separate file in kopia
|
||||
type ConversationPostMetadata struct {
|
||||
// TODO(pandeyabs): Remove this?
|
||||
PostID string `json:"postID,omitempty"`
|
||||
Recipients []string `json:"recipients,omitempty"`
|
||||
Topic string `json:"topic,omitempty"`
|
||||
// ReceivedTime time.Time `json:"receivedTime,omitempty"`
|
||||
// InReplyTo string `json:"inReplyTo,omitempty"`
|
||||
}
|
||||
|
||||
// func HasMetaSuffix(name string) bool {
|
||||
// return strings.HasSuffix(name, metaFileSuffix)
|
||||
// }
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh conversationsBackupHandler) getItemMetadata(
|
||||
ctx context.Context,
|
||||
c models.Conversationable,
|
||||
itemID string,
|
||||
receivedTime time.Time,
|
||||
) (io.ReadCloser, int, error) {
|
||||
meta := ConversationPostMetadata{
|
||||
PostID: itemID,
|
||||
Recipients: []string{bh.resourceEmail},
|
||||
Topic: ptr.Val(c.GetTopic()),
|
||||
}
|
||||
|
||||
metaJSON, err := json.Marshal(meta)
|
||||
if err != nil {
|
||||
return nil, 0, clues.WrapWC(ctx, err, "serializing post metadata")
|
||||
}
|
||||
|
||||
return io.NopCloser(bytes.NewReader(metaJSON)), len(metaJSON), nil
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh conversationsBackupHandler) augmentItemInfo(
|
||||
dgi *details.GroupsInfo,
|
||||
|
||||
@ -2,6 +2,8 @@ package groups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/microsoft/kiota-abstractions-go/serialization"
|
||||
|
||||
@ -23,6 +25,8 @@ type groupsItemer interface {
|
||||
|
||||
type backupHandler[C graph.GetIDer, I groupsItemer] interface {
|
||||
getItemer[I]
|
||||
// TODO(pandeyabs): Do we need this duplication?
|
||||
getItemMetadataer[C, I]
|
||||
getContainerser[C]
|
||||
getContainerItemIDser
|
||||
getItemAndAugmentInfoer[C, I]
|
||||
@ -33,6 +37,7 @@ type backupHandler[C graph.GetIDer, I groupsItemer] interface {
|
||||
|
||||
type getItemAndAugmentInfoer[C graph.GetIDer, I groupsItemer] interface {
|
||||
getItemer[I]
|
||||
getItemMetadataer[C, I]
|
||||
augmentItemInfoer[C]
|
||||
}
|
||||
|
||||
@ -51,6 +56,15 @@ type getItemer[I groupsItemer] interface {
|
||||
) (I, *details.GroupsInfo, error)
|
||||
}
|
||||
|
||||
type getItemMetadataer[C graph.GetIDer, I groupsItemer] interface {
|
||||
getItemMetadata(
|
||||
ctx context.Context,
|
||||
c C,
|
||||
itemID string,
|
||||
receivedTime time.Time,
|
||||
) (io.ReadCloser, int, error)
|
||||
}
|
||||
|
||||
// gets all containers for the resource
|
||||
type getContainerser[C graph.GetIDer] interface {
|
||||
getContainers(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user