merge commit

This commit is contained in:
neha-Gupta1 2023-08-22 14:45:22 +05:30
commit 8902e64487
11 changed files with 586 additions and 21 deletions

View File

@ -439,7 +439,7 @@ func (c *Collections) Get(
service, category := c.handler.ServiceCat()
md, err := graph.MakeMetadataCollection(
c.tenantID,
c.resourceOwner,
c.resourceOwner, // TODO(meain): path fixes: group id
service,
category,
[]graph.MetadataCollectionEntry{

View File

@ -75,8 +75,6 @@ func CreateCollections(
return nil, clues.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, coll := range collections {
allCollections = append(allCollections, coll)
}

View File

@ -39,8 +39,7 @@ const (
// Collection implements the interface from data.Collection
// Structure holds data for an Exchange application for a single user
type Collection struct {
// M365 user
user string // M365 user
user string
data chan data.Item
// added is a list of existing item IDs that were added to a container

View File

@ -0,0 +1,318 @@
package groups
import (
"context"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
// TODO: incremental support
// multiple lines in this file are commented out so that
// we can focus on v0 backups and re-integrate them later
// for v1 incrementals.
// since these lines represent otherwise standard boilerplate,
// it's simpler to comment them for tracking than to delete
// and re-discover them later.
func CreateCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
handler BackupHandler,
tenantID string,
scope selectors.GroupsScope,
// dps DeltaPaths,
su support.StatusUpdater,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
ctx = clues.Add(ctx, "category", scope.Category().PathType())
var (
allCollections = make([]data.BackupCollection, 0)
category = scope.Category().PathType()
qp = graph.QueryParams{
Category: category,
ProtectedResource: bpc.ProtectedResource,
TenantID: tenantID,
}
)
catProgress := observe.MessageWithCompletion(
ctx,
observe.Bulletf("%s", qp.Category))
defer close(catProgress)
// TODO(keepers): probably shouldn't call out channels here specifically.
// This should be a generic container handler. But we don't need
// to worry about that until if/when we use this code to get email
// conversations as well.
// Also, this should be produced by the Handler.
// chanPager := handler.NewChannelsPager(qp.ProtectedResource.ID())
// TODO(neha): enumerate channels
channels := []graph.Displayable{}
collections, err := populateCollections(
ctx,
qp,
handler,
su,
channels,
scope,
// dps,
bpc.Options,
errs)
if err != nil {
return nil, clues.Wrap(err, "filling collections")
}
for _, coll := range collections {
allCollections = append(allCollections, coll)
}
return allCollections, nil
}
func populateCollections(
ctx context.Context,
qp graph.QueryParams,
bh BackupHandler,
statusUpdater support.StatusUpdater,
channels []graph.Displayable,
scope selectors.GroupsScope,
// dps DeltaPaths,
ctrlOpts control.Options,
errs *fault.Bus,
) (map[string]data.BackupCollection, error) {
// channel ID -> BackupCollection.
channelCollections := map[string]data.BackupCollection{}
// channel ID -> delta url or folder path lookups
// TODO(neha/keepers): figure out if deltas are stored per channel, or per group.
// deltaURLs = map[string]string{}
// currPaths = map[string]string{}
// copy of previousPaths. every channel present in the slice param
// gets removed from this map; the remaining channels at the end of
// the process have been deleted.
// tombstones = makeTombstones(dps)
logger.Ctx(ctx).Infow("filling collections")
// , "len_deltapaths", len(dps))
el := errs.Local()
for _, c := range channels {
if el.Failure() != nil {
return nil, el.Failure()
}
cID := ptr.Val(c.GetId())
// delete(tombstones, cID)
var (
err error
// dp = dps[cID]
// prevDelta = dp.Delta
// prevPathStr = dp.Path // do not log: pii; log prevPath instead
// prevPath path.Path
ictx = clues.Add(
ctx,
"channel_id", cID)
// "previous_delta", pii.SafeURL{
// URL: prevDelta,
// SafePathElems: graph.SafeURLPathParams,
// SafeQueryKeys: graph.SafeURLQueryParams,
// })
)
// currPath, locPath
// TODO(rkeepers): the handler should provide this functionality.
// Only create a collection if the path matches the scope.
if !includeContainer(ictx, qp, c, scope, qp.Category) {
continue
}
// if len(prevPathStr) > 0 {
// if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
// logger.CtxErr(ictx, err).Error("parsing prev path")
// // if the previous path is unusable, then the delta must be, too.
// prevDelta = ""
// }
// }
// ictx = clues.Add(ictx, "previous_path", prevPath)
// TODO: the handler should provide this implementation.
items, err := collectItems(
ctx,
bh.NewMessagePager(qp.ProtectedResource.ID(), ptr.Val(c.GetId())))
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err))
continue
}
// if len(newDelta.URL) > 0 {
// deltaURLs[cID] = newDelta.URL
// } else if !newDelta.Reset {
// logger.Ctx(ictx).Info("missing delta url")
// }
var prevPath path.Path
// TODO: retrieve from handler
currPath, err := path.Builder{}.
Append(ptr.Val(c.GetId())).
ToDataLayerPath(
qp.TenantID,
qp.ProtectedResource.ID(),
path.GroupsService,
qp.Category,
true)
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err))
continue
}
edc := NewCollection(
qp.ProtectedResource.ID(),
currPath,
prevPath,
path.Builder{}.Append(ptr.Val(c.GetDisplayName())),
qp.Category,
statusUpdater,
ctrlOpts)
channelCollections[cID] = &edc
// TODO: handle deleted items for v1 backup.
// // Remove any deleted IDs from the set of added IDs because items that are
// // deleted and then restored will have a different ID than they did
// // originally.
// for _, remove := range removed {
// delete(edc.added, remove)
// edc.removed[remove] = struct{}{}
// }
// // add the current path for the container ID to be used in the next backup
// // as the "previous path", for reference in case of a rename or relocation.
// currPaths[cID] = currPath.String()
// FIXME: normally this goes before removal, but linters
for _, item := range items {
edc.added[ptr.Val(item.GetId())] = struct{}{}
}
}
// TODO: handle tombstones here
logger.Ctx(ctx).Infow(
"adding metadata collection entries",
// "num_deltas_entries", len(deltaURLs),
"num_paths_entries", len(channelCollections))
// col, err := graph.MakeMetadataCollection(
// qp.TenantID,
// qp.ProtectedResource.ID(),
// path.ExchangeService,
// qp.Category,
// []graph.MetadataCollectionEntry{
// graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths),
// graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs),
// },
// statusUpdater)
// if err != nil {
// return nil, clues.Wrap(err, "making metadata collection")
// }
// channelCollections["metadata"] = col
return channelCollections, el.Failure()
}
func collectItems(
ctx context.Context,
pager api.ChannelMessageDeltaEnumerator,
) ([]models.ChatMessageable, error) {
items := []models.ChatMessageable{}
for {
// assume delta urls here, which allows single-token consumption
page, err := pager.GetPage(graph.ConsumeNTokens(ctx, graph.SingleGetOrDeltaLC))
if err != nil {
return nil, graph.Wrap(ctx, err, "getting page")
}
// if graph.IsErrInvalidDelta(err) {
// logger.Ctx(ctx).Infow("Invalid previous delta link", "link", prevDelta)
// invalidPrevDelta = true
// newPaths = map[string]string{}
// pager.Reset()
// continue
// }
vals, err := pager.ValuesIn(page)
if err != nil {
return nil, graph.Wrap(ctx, err, "getting items in page")
}
items = append(items, vals...)
nextLink, _ := api.NextAndDeltaLink(page)
// if len(deltaLink) > 0 {
// newDeltaURL = deltaLink
// }
// Check if there are more items
if len(nextLink) == 0 {
break
}
logger.Ctx(ctx).Debugw("found nextLink", "next_link", nextLink)
pager.SetNext(nextLink)
}
return items, nil
}
// Returns true if the container passes the scope comparison and should be included.
// Returns:
// - the path representing the directory as it should be stored in the repository.
// - the human-readable path using display names.
// - true if the path passes the scope comparison.
func includeContainer(
ctx context.Context,
qp graph.QueryParams,
gd graph.Displayable,
scope selectors.GroupsScope,
category path.CategoryType,
) bool {
// assume a single-level hierarchy
directory := ptr.Val(gd.GetDisplayName())
// TODO(keepers): awaiting parent branch to update to main
ok := scope.Matches(selectors.GroupsCategoryUnknown, directory)
logger.Ctx(ctx).With(
"included", ok,
"scope", scope,
"match_target", directory,
).Debug("backup folder selection filter")
return ok
}

View File

@ -0,0 +1,180 @@
package groups
import (
"bytes"
"context"
"io"
"time"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
var (
_ data.BackupCollection = &Collection{}
_ data.Item = &Item{}
_ data.ItemInfo = &Item{}
_ data.ItemModTime = &Item{}
)
const (
collectionChannelBufferSize = 1000
numberOfRetries = 4
)
type Collection struct {
protectedResource string
items chan data.Item
// added is a list of existing item IDs that were added to a container
added map[string]struct{}
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{}
// items itemGetterSerializer
category path.CategoryType
statusUpdater support.StatusUpdater
ctrl control.Options
// FullPath is the current hierarchical path used by this collection.
fullPath path.Path
// PrevPath is the previous hierarchical path used by this collection.
// It may be the same as fullPath, if the folder was not renamed or
// moved. It will be empty on its first retrieval.
prevPath path.Path
// LocationPath contains the path with human-readable display names.
// IE: "/Inbox/Important" instead of "/abcdxyz123/algha=lgkhal=t"
locationPath *path.Builder
state data.CollectionState
// doNotMergeItems should only be true if the old delta token expired.
// doNotMergeItems bool
}
// NewExchangeDataCollection creates an ExchangeDataCollection.
// State of the collection is set as an observation of the current
// and previous paths. If the curr path is nil, the state is assumed
// to be deleted. If the prev path is nil, it is assumed newly created.
// If both are populated, then state is either moved (if they differ),
// or notMoved (if they match).
func NewCollection(
protectedResource string,
curr, prev path.Path,
location *path.Builder,
category path.CategoryType,
statusUpdater support.StatusUpdater,
ctrlOpts control.Options,
// doNotMergeItems bool,
) Collection {
collection := Collection{
added: make(map[string]struct{}, 0),
category: category,
ctrl: ctrlOpts,
items: make(chan data.Item, collectionChannelBufferSize),
// doNotMergeItems: doNotMergeItems,
fullPath: curr,
locationPath: location,
prevPath: prev,
removed: make(map[string]struct{}, 0),
state: data.StateOf(prev, curr),
statusUpdater: statusUpdater,
protectedResource: protectedResource,
}
return collection
}
// Items utility function to asynchronously execute process to fill data channel with
// M365 exchange objects and returns the data channel
func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item {
// go col.streamItems(ctx, errs)
return col.items
}
// FullPath returns the Collection's fullPath []string
func (col *Collection) FullPath() path.Path {
return col.fullPath
}
// LocationPath produces the Collection's full path, but with display names
// instead of IDs in the folders. Only populated for Calendars.
func (col *Collection) LocationPath() *path.Builder {
return col.locationPath
}
// TODO(ashmrtn): Fill in with previous path once the Controller compares old
// and new folder hierarchies.
func (col Collection) PreviousPath() path.Path {
return col.prevPath
}
func (col Collection) State() data.CollectionState {
return col.state
}
func (col Collection) DoNotMergeItems() bool {
// TODO: depends on whether or not deltas are valid
return true
}
// ---------------------------------------------------------------------------
// items
// ---------------------------------------------------------------------------
// Item represents a single item retrieved from exchange
type Item struct {
id string
// TODO: We may need this to be a "oneOf" of `message`, `contact`, etc.
// going forward. Using []byte for now but I assume we'll have
// some structured type in here (serialization to []byte can be done in `Read`)
message []byte
info *details.ExchangeInfo // temporary change to bring populate function into directory
// TODO(ashmrtn): Can probably eventually be sourced from info as there's a
// request to provide modtime in ItemInfo structs.
modTime time.Time
// true if the item was marked by graph as deleted.
deleted bool
}
func (i *Item) ID() string {
return i.id
}
func (i *Item) ToReader() io.ReadCloser {
return io.NopCloser(bytes.NewReader(i.message))
}
func (i Item) Deleted() bool {
return i.deleted
}
func (i *Item) Info() details.ItemInfo {
return details.ItemInfo{Exchange: i.info}
}
func (i *Item) ModTime() time.Time {
return i.modTime
}
func NewItem(
identifier string,
dataBytes []byte,
detail details.ExchangeInfo,
modTime time.Time,
) Item {
return Item{
id: identifier,
message: dataBytes,
info: &detail,
modTime: modTime,
}
}

View File

@ -0,0 +1,33 @@
package groups
import (
"context"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type BackupHandler interface {
GetChannelByID(
ctx context.Context,
teamID, channelID string,
) (models.Channelable, error)
NewChannelsPager(
teamID string,
) api.ChannelDeltaEnumerator
GetMessageByID(
ctx context.Context,
teamID, channelID, itemID string,
) (models.ChatMessageable, error)
NewMessagePager(
teamID, channelID string,
) api.ChannelMessageDeltaEnumerator
GetMessageReplies(
ctx context.Context,
teamID, channelID, messageID string,
) (serialization.Parsable, error)
}

View File

@ -350,6 +350,14 @@ func (pb Builder) ToDataLayerSharePointPath(
return pb.ToDataLayerPath(tenant, site, SharePointService, category, isItem)
}
func (pb Builder) ToDataLayerGroupPath(
tenant, group string,
category CategoryType,
isItem bool,
) (Path, error) {
return pb.ToDataLayerPath(tenant, group, GroupsService, category, isItem)
}
// ---------------------------------------------------------------------------
// Stringers and PII Concealer Compliance
// ---------------------------------------------------------------------------

View File

@ -82,7 +82,7 @@ var serviceCategories = map[ServiceType]map[CategoryType]struct{}{
},
TeamsService: {
ChannelMessagesCategory: {},
LibrariesCategory: {},
LibrariesCategory: {}, // TODO(meain)
},
}

View File

@ -329,14 +329,24 @@ const (
GroupsGroup groupsCategory = "GroupsGroup"
GroupsChannel groupsCategory = "GroupsChannel"
GroupsChannelMessage groupsCategory = "GroupsChannelMessage"
GroupsLibraryFolder groupsCategory = "GroupsLibraryFolder"
GroupsLibraryItem groupsCategory = "GroupsLibraryItem"
// details.itemInfo comparables
// channel drive selection
GroupsInfoSiteLibraryDrive groupsCategory = "GroupsInfoSiteLibraryDrive"
GroupsInfoChannel groupsCategory = "GroupsInfoChannel"
GroupsInfoChannel groupsCategory = "GroupsInfoChannel"
// sharepoint
GroupsLibraryFolder groupsCategory = "GroupsLibraryFolder"
GroupsLibraryItem groupsCategory = "GroupsLibraryItem"
// messages
// GroupsTeamChannel groupsCategory = "GroupsTeamChannel"
// GroupsTeamChannelMessages groupsCategory = "GroupsTeamChannelMessages"
// details.itemInfo comparables
// library drive selection
GroupsInfoSiteLibraryDrive groupsCategory = "GroupsInfoSiteLibraryDrive" // TODO(meain)
)
// groupsLeafProperties describes common metadata of the leaf categories
@ -424,7 +434,7 @@ func (c groupsCategory) pathValues(
}
folderCat, itemCat = GroupsLibraryFolder, GroupsLibraryItem
rFld = ent.Groups.ParentPath
rFld = ent.Groups.ParentPath // TODO(meain)
default:
return nil, clues.New("unrecognized groupsCategory").With("category", c)

View File

@ -1,19 +1,19 @@
package api
import (
"context"
)
import "github.com/microsoftgraph/msgraph-sdk-go/models"
// ---------------------------------------------------------------------------
// item pager
// ---------------------------------------------------------------------------
type MessageItemDeltaEnumerator interface {
GetPage(context.Context) (DeltaPageLinker, error)
type ChannelMessageDeltaEnumerator interface {
DeltaGetPager
ValuesInPageLinker[models.ChatMessageable]
SetNextLinker
}
// TODO: implement
// var _ MessageItemDeltaEnumerator = &messagePageCtrl{}
// var _ ChannelMessageDeltaEnumerator = &messagePageCtrl{}
// type messagePageCtrl struct {
// gs graph.Servicer
@ -25,12 +25,14 @@ type MessageItemDeltaEnumerator interface {
// channel pager
// ---------------------------------------------------------------------------
type ChannelItemDeltaEnumerator interface {
GetPage(context.Context) (DeltaPageLinker, error)
type ChannelDeltaEnumerator interface {
DeltaGetPager
ValuesInPageLinker[models.Channelable]
SetNextLinker
}
// TODO: implement
// var _ ChannelsItemDeltaEnumerator = &channelsPageCtrl{}
// var _ ChannelDeltaEnumerator = &channelsPageCtrl{}
// type channelsPageCtrl struct {
// gs graph.Servicer

View File

@ -13,9 +13,18 @@ import (
)
// ---------------------------------------------------------------------------
// common interfaces and funcs
// common interfaces
// ---------------------------------------------------------------------------
// TODO(keepers): replace all matching uses of GetPage with this.
type DeltaGetPager interface {
GetPage(context.Context) (DeltaPageLinker, error)
}
type ValuesInPageLinker[T any] interface {
ValuesIn(PageLinker) ([]T, error)
}
type PageLinker interface {
GetOdataNextLink() *string
}
@ -25,6 +34,14 @@ type DeltaPageLinker interface {
GetOdataDeltaLink() *string
}
type SetNextLinker interface {
SetNext(nextLink string)
}
// ---------------------------------------------------------------------------
// common funcs
// ---------------------------------------------------------------------------
// IsNextLinkValid separate check to investigate whether error is
func IsNextLinkValid(next string) bool {
return !strings.Contains(next, `users//`)