From 06862c3b8c4eb80d2a3e92da2f58d5141b98dd51 Mon Sep 17 00:00:00 2001 From: Keepers Date: Tue, 22 Aug 2023 11:25:06 -0600 Subject: [PATCH] add boilerplate groups backup collection (#4082) Adds the boilerplate for groups backup collection processing. Not necessarily functional at this time, due to missing dependencies and consts that aren't yet in the branch. Thus the lack of tests. It's just good enough to keep progress rolling forward. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature #### Issue(s) * #3989 #### Test Plan --- .../m365/collection/exchange/backup.go | 2 - .../m365/collection/exchange/collection.go | 3 +- src/internal/m365/collection/groups/backup.go | 318 ++++++++++++++++++ .../m365/collection/groups/collection.go | 180 ++++++++++ .../m365/collection/groups/handler.go | 18 - .../m365/collection/groups/handlers.go | 33 ++ src/pkg/services/m365/api/channels_pager.go | 18 +- src/pkg/services/m365/api/item_pager.go | 19 +- 8 files changed, 561 insertions(+), 30 deletions(-) create mode 100644 src/internal/m365/collection/groups/backup.go create mode 100644 src/internal/m365/collection/groups/collection.go delete mode 100644 src/internal/m365/collection/groups/handler.go create mode 100644 src/internal/m365/collection/groups/handlers.go diff --git a/src/internal/m365/collection/exchange/backup.go b/src/internal/m365/collection/exchange/backup.go index 359701629..f5ebd1783 100644 --- a/src/internal/m365/collection/exchange/backup.go +++ b/src/internal/m365/collection/exchange/backup.go @@ -75,8 +75,6 @@ func CreateCollections( return nil, clues.Wrap(err, "filling collections") } - foldersComplete <- struct{}{} - for _, coll := range collections { allCollections = append(allCollections, coll) } diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index 8e0c0f897..ba421763c 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -39,8 +39,7 @@ const ( // Collection implements the interface from data.Collection // Structure holds data for an Exchange application for a single user type Collection struct { - // M365 user - user string // M365 user + user string data chan data.Item // added is a list of existing item IDs that were added to a container diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go new file mode 100644 index 000000000..9b31126a1 --- /dev/null +++ b/src/internal/m365/collection/groups/backup.go @@ -0,0 +1,318 @@ +package groups + +import ( + "context" + + "github.com/alcionai/clues" + "github.com/microsoftgraph/msgraph-sdk-go/models" + + "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" + "github.com/alcionai/corso/src/pkg/services/m365/api" +) + +// TODO: incremental support +// multiple lines in this file are commented out so that +// we can focus on v0 backups and re-integrate them later +// for v1 incrementals. +// since these lines represent otherwise standard boilerplate, +// it's simpler to comment them for tracking than to delete +// and re-discover them later. + +func CreateCollections( + ctx context.Context, + bpc inject.BackupProducerConfig, + handler BackupHandler, + tenantID string, + scope selectors.GroupsScope, + // dps DeltaPaths, + su support.StatusUpdater, + errs *fault.Bus, +) ([]data.BackupCollection, error) { + ctx = clues.Add(ctx, "category", scope.Category().PathType()) + + var ( + allCollections = make([]data.BackupCollection, 0) + category = scope.Category().PathType() + qp = graph.QueryParams{ + Category: category, + ProtectedResource: bpc.ProtectedResource, + TenantID: tenantID, + } + ) + + catProgress := observe.MessageWithCompletion( + ctx, + observe.Bulletf("%s", qp.Category)) + defer close(catProgress) + + // TODO(keepers): probably shouldn't call out channels here specifically. + // This should be a generic container handler. But we don't need + // to worry about that until if/when we use this code to get email + // conversations as well. + // Also, this should be produced by the Handler. + // chanPager := handler.NewChannelsPager(qp.ProtectedResource.ID()) + // TODO(neha): enumerate channels + channels := []graph.Displayable{} + + collections, err := populateCollections( + ctx, + qp, + handler, + su, + channels, + scope, + // dps, + bpc.Options, + errs) + if err != nil { + return nil, clues.Wrap(err, "filling collections") + } + + for _, coll := range collections { + allCollections = append(allCollections, coll) + } + + return allCollections, nil +} + +func populateCollections( + ctx context.Context, + qp graph.QueryParams, + bh BackupHandler, + statusUpdater support.StatusUpdater, + channels []graph.Displayable, + scope selectors.GroupsScope, + // dps DeltaPaths, + ctrlOpts control.Options, + errs *fault.Bus, +) (map[string]data.BackupCollection, error) { + // channel ID -> BackupCollection. + channelCollections := map[string]data.BackupCollection{} + + // channel ID -> delta url or folder path lookups + // TODO(neha/keepers): figure out if deltas are stored per channel, or per group. + // deltaURLs = map[string]string{} + // currPaths = map[string]string{} + // copy of previousPaths. every channel present in the slice param + // gets removed from this map; the remaining channels at the end of + // the process have been deleted. + // tombstones = makeTombstones(dps) + + logger.Ctx(ctx).Infow("filling collections") + // , "len_deltapaths", len(dps)) + + el := errs.Local() + + for _, c := range channels { + if el.Failure() != nil { + return nil, el.Failure() + } + + cID := ptr.Val(c.GetId()) + // delete(tombstones, cID) + + var ( + err error + // dp = dps[cID] + // prevDelta = dp.Delta + // prevPathStr = dp.Path // do not log: pii; log prevPath instead + // prevPath path.Path + ictx = clues.Add( + ctx, + "channel_id", cID) + // "previous_delta", pii.SafeURL{ + // URL: prevDelta, + // SafePathElems: graph.SafeURLPathParams, + // SafeQueryKeys: graph.SafeURLQueryParams, + // }) + ) + + // currPath, locPath + // TODO(rkeepers): the handler should provide this functionality. + // Only create a collection if the path matches the scope. + if !includeContainer(ictx, qp, c, scope, qp.Category) { + continue + } + + // if len(prevPathStr) > 0 { + // if prevPath, err = pathFromPrevString(prevPathStr); err != nil { + // logger.CtxErr(ictx, err).Error("parsing prev path") + // // if the previous path is unusable, then the delta must be, too. + // prevDelta = "" + // } + // } + + // ictx = clues.Add(ictx, "previous_path", prevPath) + + // TODO: the handler should provide this implementation. + items, err := collectItems( + ctx, + bh.NewMessagePager(qp.ProtectedResource.ID(), ptr.Val(c.GetId()))) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err)) + continue + } + + // if len(newDelta.URL) > 0 { + // deltaURLs[cID] = newDelta.URL + // } else if !newDelta.Reset { + // logger.Ctx(ictx).Info("missing delta url") + // } + + var prevPath path.Path + + // TODO: retrieve from handler + currPath, err := path.Builder{}. + Append(ptr.Val(c.GetId())). + ToDataLayerPath( + qp.TenantID, + qp.ProtectedResource.ID(), + path.GroupsService, + qp.Category, + true) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err)) + continue + } + + edc := NewCollection( + qp.ProtectedResource.ID(), + currPath, + prevPath, + path.Builder{}.Append(ptr.Val(c.GetDisplayName())), + qp.Category, + statusUpdater, + ctrlOpts) + + channelCollections[cID] = &edc + + // TODO: handle deleted items for v1 backup. + // // Remove any deleted IDs from the set of added IDs because items that are + // // deleted and then restored will have a different ID than they did + // // originally. + // for _, remove := range removed { + // delete(edc.added, remove) + // edc.removed[remove] = struct{}{} + // } + + // // add the current path for the container ID to be used in the next backup + // // as the "previous path", for reference in case of a rename or relocation. + // currPaths[cID] = currPath.String() + + // FIXME: normally this goes before removal, but linters + for _, item := range items { + edc.added[ptr.Val(item.GetId())] = struct{}{} + } + } + + // TODO: handle tombstones here + + logger.Ctx(ctx).Infow( + "adding metadata collection entries", + // "num_deltas_entries", len(deltaURLs), + "num_paths_entries", len(channelCollections)) + + // col, err := graph.MakeMetadataCollection( + // qp.TenantID, + // qp.ProtectedResource.ID(), + // path.ExchangeService, + // qp.Category, + // []graph.MetadataCollectionEntry{ + // graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths), + // graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs), + // }, + // statusUpdater) + // if err != nil { + // return nil, clues.Wrap(err, "making metadata collection") + // } + + // channelCollections["metadata"] = col + + return channelCollections, el.Failure() +} + +func collectItems( + ctx context.Context, + pager api.ChannelMessageDeltaEnumerator, +) ([]models.ChatMessageable, error) { + items := []models.ChatMessageable{} + + for { + // assume delta urls here, which allows single-token consumption + page, err := pager.GetPage(graph.ConsumeNTokens(ctx, graph.SingleGetOrDeltaLC)) + if err != nil { + return nil, graph.Wrap(ctx, err, "getting page") + } + + // if graph.IsErrInvalidDelta(err) { + // logger.Ctx(ctx).Infow("Invalid previous delta link", "link", prevDelta) + + // invalidPrevDelta = true + // newPaths = map[string]string{} + + // pager.Reset() + + // continue + // } + + vals, err := pager.ValuesIn(page) + if err != nil { + return nil, graph.Wrap(ctx, err, "getting items in page") + } + + items = append(items, vals...) + + nextLink, _ := api.NextAndDeltaLink(page) + + // if len(deltaLink) > 0 { + // newDeltaURL = deltaLink + // } + + // Check if there are more items + if len(nextLink) == 0 { + break + } + + logger.Ctx(ctx).Debugw("found nextLink", "next_link", nextLink) + pager.SetNext(nextLink) + } + + return items, nil +} + +// Returns true if the container passes the scope comparison and should be included. +// Returns: +// - the path representing the directory as it should be stored in the repository. +// - the human-readable path using display names. +// - true if the path passes the scope comparison. +func includeContainer( + ctx context.Context, + qp graph.QueryParams, + gd graph.Displayable, + scope selectors.GroupsScope, + category path.CategoryType, +) bool { + // assume a single-level hierarchy + directory := ptr.Val(gd.GetDisplayName()) + + // TODO(keepers): awaiting parent branch to update to main + ok := scope.Matches(selectors.GroupsCategoryUnknown, directory) + + logger.Ctx(ctx).With( + "included", ok, + "scope", scope, + "match_target", directory, + ).Debug("backup folder selection filter") + + return ok +} diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go new file mode 100644 index 000000000..c1e6a4042 --- /dev/null +++ b/src/internal/m365/collection/groups/collection.go @@ -0,0 +1,180 @@ +package groups + +import ( + "bytes" + "context" + "io" + "time" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" +) + +var ( + _ data.BackupCollection = &Collection{} + _ data.Item = &Item{} + _ data.ItemInfo = &Item{} + _ data.ItemModTime = &Item{} +) + +const ( + collectionChannelBufferSize = 1000 + numberOfRetries = 4 +) + +type Collection struct { + protectedResource string + items chan data.Item + + // added is a list of existing item IDs that were added to a container + added map[string]struct{} + // removed is a list of item IDs that were deleted from, or moved out, of a container + removed map[string]struct{} + + // items itemGetterSerializer + + category path.CategoryType + statusUpdater support.StatusUpdater + ctrl control.Options + + // FullPath is the current hierarchical path used by this collection. + fullPath path.Path + + // PrevPath is the previous hierarchical path used by this collection. + // It may be the same as fullPath, if the folder was not renamed or + // moved. It will be empty on its first retrieval. + prevPath path.Path + + // LocationPath contains the path with human-readable display names. + // IE: "/Inbox/Important" instead of "/abcdxyz123/algha=lgkhal=t" + locationPath *path.Builder + + state data.CollectionState + + // doNotMergeItems should only be true if the old delta token expired. + // doNotMergeItems bool +} + +// NewExchangeDataCollection creates an ExchangeDataCollection. +// State of the collection is set as an observation of the current +// and previous paths. If the curr path is nil, the state is assumed +// to be deleted. If the prev path is nil, it is assumed newly created. +// If both are populated, then state is either moved (if they differ), +// or notMoved (if they match). +func NewCollection( + protectedResource string, + curr, prev path.Path, + location *path.Builder, + category path.CategoryType, + statusUpdater support.StatusUpdater, + ctrlOpts control.Options, + // doNotMergeItems bool, +) Collection { + collection := Collection{ + added: make(map[string]struct{}, 0), + category: category, + ctrl: ctrlOpts, + items: make(chan data.Item, collectionChannelBufferSize), + // doNotMergeItems: doNotMergeItems, + fullPath: curr, + locationPath: location, + prevPath: prev, + removed: make(map[string]struct{}, 0), + state: data.StateOf(prev, curr), + statusUpdater: statusUpdater, + protectedResource: protectedResource, + } + + return collection +} + +// Items utility function to asynchronously execute process to fill data channel with +// M365 exchange objects and returns the data channel +func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { + // go col.streamItems(ctx, errs) + return col.items +} + +// FullPath returns the Collection's fullPath []string +func (col *Collection) FullPath() path.Path { + return col.fullPath +} + +// LocationPath produces the Collection's full path, but with display names +// instead of IDs in the folders. Only populated for Calendars. +func (col *Collection) LocationPath() *path.Builder { + return col.locationPath +} + +// TODO(ashmrtn): Fill in with previous path once the Controller compares old +// and new folder hierarchies. +func (col Collection) PreviousPath() path.Path { + return col.prevPath +} + +func (col Collection) State() data.CollectionState { + return col.state +} + +func (col Collection) DoNotMergeItems() bool { + // TODO: depends on whether or not deltas are valid + return true +} + +// --------------------------------------------------------------------------- +// items +// --------------------------------------------------------------------------- + +// Item represents a single item retrieved from exchange +type Item struct { + id string + // TODO: We may need this to be a "oneOf" of `message`, `contact`, etc. + // going forward. Using []byte for now but I assume we'll have + // some structured type in here (serialization to []byte can be done in `Read`) + message []byte + info *details.ExchangeInfo // temporary change to bring populate function into directory + // TODO(ashmrtn): Can probably eventually be sourced from info as there's a + // request to provide modtime in ItemInfo structs. + modTime time.Time + + // true if the item was marked by graph as deleted. + deleted bool +} + +func (i *Item) ID() string { + return i.id +} + +func (i *Item) ToReader() io.ReadCloser { + return io.NopCloser(bytes.NewReader(i.message)) +} + +func (i Item) Deleted() bool { + return i.deleted +} + +func (i *Item) Info() details.ItemInfo { + return details.ItemInfo{Exchange: i.info} +} + +func (i *Item) ModTime() time.Time { + return i.modTime +} + +func NewItem( + identifier string, + dataBytes []byte, + detail details.ExchangeInfo, + modTime time.Time, +) Item { + return Item{ + id: identifier, + message: dataBytes, + info: &detail, + modTime: modTime, + } +} diff --git a/src/internal/m365/collection/groups/handler.go b/src/internal/m365/collection/groups/handler.go deleted file mode 100644 index d4a382149..000000000 --- a/src/internal/m365/collection/groups/handler.go +++ /dev/null @@ -1,18 +0,0 @@ -package groups - -import ( - "context" - - "github.com/microsoft/kiota-abstractions-go/serialization" - "github.com/microsoftgraph/msgraph-sdk-go/models" - - "github.com/alcionai/corso/src/pkg/services/m365/api" -) - -type BackupMessagesHandler interface { - GetMessageByID(ctx context.Context, teamID, channelID, itemID string) (models.ChatMessageable, error) - NewMessagePager(teamID, channelID string) api.MessageItemDeltaEnumerator - GetChannelByID(ctx context.Context, teamID, channelID string) (models.Channelable, error) - NewChannelPager(teamID, channelID string) api.ChannelItemDeltaEnumerator - GetReplyByID(ctx context.Context, teamID, channelID, messageID string) (serialization.Parsable, error) -} diff --git a/src/internal/m365/collection/groups/handlers.go b/src/internal/m365/collection/groups/handlers.go new file mode 100644 index 000000000..f5a28fd28 --- /dev/null +++ b/src/internal/m365/collection/groups/handlers.go @@ -0,0 +1,33 @@ +package groups + +import ( + "context" + + "github.com/microsoft/kiota-abstractions-go/serialization" + "github.com/microsoftgraph/msgraph-sdk-go/models" + + "github.com/alcionai/corso/src/pkg/services/m365/api" +) + +type BackupHandler interface { + GetChannelByID( + ctx context.Context, + teamID, channelID string, + ) (models.Channelable, error) + NewChannelsPager( + teamID string, + ) api.ChannelDeltaEnumerator + + GetMessageByID( + ctx context.Context, + teamID, channelID, itemID string, + ) (models.ChatMessageable, error) + NewMessagePager( + teamID, channelID string, + ) api.ChannelMessageDeltaEnumerator + + GetMessageReplies( + ctx context.Context, + teamID, channelID, messageID string, + ) (serialization.Parsable, error) +} diff --git a/src/pkg/services/m365/api/channels_pager.go b/src/pkg/services/m365/api/channels_pager.go index 599c09649..58aecaf6c 100644 --- a/src/pkg/services/m365/api/channels_pager.go +++ b/src/pkg/services/m365/api/channels_pager.go @@ -1,19 +1,21 @@ package api import ( - "context" + "github.com/microsoftgraph/msgraph-sdk-go/models" ) // --------------------------------------------------------------------------- // item pager // --------------------------------------------------------------------------- -type MessageItemDeltaEnumerator interface { - GetPage(context.Context) (DeltaPageLinker, error) +type ChannelMessageDeltaEnumerator interface { + DeltaGetPager + ValuesInPageLinker[models.ChatMessageable] + SetNextLinker } // TODO: implement -// var _ MessageItemDeltaEnumerator = &messagePageCtrl{} +// var _ ChannelMessageDeltaEnumerator = &messagePageCtrl{} // type messagePageCtrl struct { // gs graph.Servicer @@ -25,12 +27,14 @@ type MessageItemDeltaEnumerator interface { // channel pager // --------------------------------------------------------------------------- -type ChannelItemDeltaEnumerator interface { - GetPage(context.Context) (DeltaPageLinker, error) +type ChannelDeltaEnumerator interface { + DeltaGetPager + ValuesInPageLinker[models.Channelable] + SetNextLinker } // TODO: implement -// var _ ChannelsItemDeltaEnumerator = &channelsPageCtrl{} +// var _ ChannelDeltaEnumerator = &channelsPageCtrl{} // type channelsPageCtrl struct { // gs graph.Servicer diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index ef54b1a3d..4cb272d51 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -13,9 +13,18 @@ import ( ) // --------------------------------------------------------------------------- -// common interfaces and funcs +// common interfaces // --------------------------------------------------------------------------- +// TODO(keepers): replace all matching uses of GetPage with this. +type DeltaGetPager interface { + GetPage(context.Context) (DeltaPageLinker, error) +} + +type ValuesInPageLinker[T any] interface { + ValuesIn(PageLinker) ([]T, error) +} + type PageLinker interface { GetOdataNextLink() *string } @@ -25,6 +34,14 @@ type DeltaPageLinker interface { GetOdataDeltaLink() *string } +type SetNextLinker interface { + SetNext(nextLink string) +} + +// --------------------------------------------------------------------------- +// common funcs +// --------------------------------------------------------------------------- + // IsNextLinkValid separate check to investigate whether error is func IsNextLinkValid(next string) bool { return !strings.Contains(next, `users//`)