diff --git a/src/internal/m365/collection/exchange/backup.go b/src/internal/m365/collection/exchange/backup.go index 359701629..f5ebd1783 100644 --- a/src/internal/m365/collection/exchange/backup.go +++ b/src/internal/m365/collection/exchange/backup.go @@ -75,8 +75,6 @@ func CreateCollections( return nil, clues.Wrap(err, "filling collections") } - foldersComplete <- struct{}{} - for _, coll := range collections { allCollections = append(allCollections, coll) } diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index 8e0c0f897..ba421763c 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -39,8 +39,7 @@ const ( // Collection implements the interface from data.Collection // Structure holds data for an Exchange application for a single user type Collection struct { - // M365 user - user string // M365 user + user string data chan data.Item // added is a list of existing item IDs that were added to a container diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go new file mode 100644 index 000000000..9b31126a1 --- /dev/null +++ b/src/internal/m365/collection/groups/backup.go @@ -0,0 +1,318 @@ +package groups + +import ( + "context" + + "github.com/alcionai/clues" + "github.com/microsoftgraph/msgraph-sdk-go/models" + + "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" + "github.com/alcionai/corso/src/pkg/services/m365/api" +) + +// TODO: incremental support +// multiple lines in this file are commented out so that +// we can focus on v0 backups and re-integrate them later +// for v1 incrementals. +// since these lines represent otherwise standard boilerplate, +// it's simpler to comment them for tracking than to delete +// and re-discover them later. + +func CreateCollections( + ctx context.Context, + bpc inject.BackupProducerConfig, + handler BackupHandler, + tenantID string, + scope selectors.GroupsScope, + // dps DeltaPaths, + su support.StatusUpdater, + errs *fault.Bus, +) ([]data.BackupCollection, error) { + ctx = clues.Add(ctx, "category", scope.Category().PathType()) + + var ( + allCollections = make([]data.BackupCollection, 0) + category = scope.Category().PathType() + qp = graph.QueryParams{ + Category: category, + ProtectedResource: bpc.ProtectedResource, + TenantID: tenantID, + } + ) + + catProgress := observe.MessageWithCompletion( + ctx, + observe.Bulletf("%s", qp.Category)) + defer close(catProgress) + + // TODO(keepers): probably shouldn't call out channels here specifically. + // This should be a generic container handler. But we don't need + // to worry about that until if/when we use this code to get email + // conversations as well. + // Also, this should be produced by the Handler. + // chanPager := handler.NewChannelsPager(qp.ProtectedResource.ID()) + // TODO(neha): enumerate channels + channels := []graph.Displayable{} + + collections, err := populateCollections( + ctx, + qp, + handler, + su, + channels, + scope, + // dps, + bpc.Options, + errs) + if err != nil { + return nil, clues.Wrap(err, "filling collections") + } + + for _, coll := range collections { + allCollections = append(allCollections, coll) + } + + return allCollections, nil +} + +func populateCollections( + ctx context.Context, + qp graph.QueryParams, + bh BackupHandler, + statusUpdater support.StatusUpdater, + channels []graph.Displayable, + scope selectors.GroupsScope, + // dps DeltaPaths, + ctrlOpts control.Options, + errs *fault.Bus, +) (map[string]data.BackupCollection, error) { + // channel ID -> BackupCollection. + channelCollections := map[string]data.BackupCollection{} + + // channel ID -> delta url or folder path lookups + // TODO(neha/keepers): figure out if deltas are stored per channel, or per group. + // deltaURLs = map[string]string{} + // currPaths = map[string]string{} + // copy of previousPaths. every channel present in the slice param + // gets removed from this map; the remaining channels at the end of + // the process have been deleted. + // tombstones = makeTombstones(dps) + + logger.Ctx(ctx).Infow("filling collections") + // , "len_deltapaths", len(dps)) + + el := errs.Local() + + for _, c := range channels { + if el.Failure() != nil { + return nil, el.Failure() + } + + cID := ptr.Val(c.GetId()) + // delete(tombstones, cID) + + var ( + err error + // dp = dps[cID] + // prevDelta = dp.Delta + // prevPathStr = dp.Path // do not log: pii; log prevPath instead + // prevPath path.Path + ictx = clues.Add( + ctx, + "channel_id", cID) + // "previous_delta", pii.SafeURL{ + // URL: prevDelta, + // SafePathElems: graph.SafeURLPathParams, + // SafeQueryKeys: graph.SafeURLQueryParams, + // }) + ) + + // currPath, locPath + // TODO(rkeepers): the handler should provide this functionality. + // Only create a collection if the path matches the scope. + if !includeContainer(ictx, qp, c, scope, qp.Category) { + continue + } + + // if len(prevPathStr) > 0 { + // if prevPath, err = pathFromPrevString(prevPathStr); err != nil { + // logger.CtxErr(ictx, err).Error("parsing prev path") + // // if the previous path is unusable, then the delta must be, too. + // prevDelta = "" + // } + // } + + // ictx = clues.Add(ictx, "previous_path", prevPath) + + // TODO: the handler should provide this implementation. + items, err := collectItems( + ctx, + bh.NewMessagePager(qp.ProtectedResource.ID(), ptr.Val(c.GetId()))) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err)) + continue + } + + // if len(newDelta.URL) > 0 { + // deltaURLs[cID] = newDelta.URL + // } else if !newDelta.Reset { + // logger.Ctx(ictx).Info("missing delta url") + // } + + var prevPath path.Path + + // TODO: retrieve from handler + currPath, err := path.Builder{}. + Append(ptr.Val(c.GetId())). + ToDataLayerPath( + qp.TenantID, + qp.ProtectedResource.ID(), + path.GroupsService, + qp.Category, + true) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err)) + continue + } + + edc := NewCollection( + qp.ProtectedResource.ID(), + currPath, + prevPath, + path.Builder{}.Append(ptr.Val(c.GetDisplayName())), + qp.Category, + statusUpdater, + ctrlOpts) + + channelCollections[cID] = &edc + + // TODO: handle deleted items for v1 backup. + // // Remove any deleted IDs from the set of added IDs because items that are + // // deleted and then restored will have a different ID than they did + // // originally. + // for _, remove := range removed { + // delete(edc.added, remove) + // edc.removed[remove] = struct{}{} + // } + + // // add the current path for the container ID to be used in the next backup + // // as the "previous path", for reference in case of a rename or relocation. + // currPaths[cID] = currPath.String() + + // FIXME: normally this goes before removal, but linters + for _, item := range items { + edc.added[ptr.Val(item.GetId())] = struct{}{} + } + } + + // TODO: handle tombstones here + + logger.Ctx(ctx).Infow( + "adding metadata collection entries", + // "num_deltas_entries", len(deltaURLs), + "num_paths_entries", len(channelCollections)) + + // col, err := graph.MakeMetadataCollection( + // qp.TenantID, + // qp.ProtectedResource.ID(), + // path.ExchangeService, + // qp.Category, + // []graph.MetadataCollectionEntry{ + // graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths), + // graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs), + // }, + // statusUpdater) + // if err != nil { + // return nil, clues.Wrap(err, "making metadata collection") + // } + + // channelCollections["metadata"] = col + + return channelCollections, el.Failure() +} + +func collectItems( + ctx context.Context, + pager api.ChannelMessageDeltaEnumerator, +) ([]models.ChatMessageable, error) { + items := []models.ChatMessageable{} + + for { + // assume delta urls here, which allows single-token consumption + page, err := pager.GetPage(graph.ConsumeNTokens(ctx, graph.SingleGetOrDeltaLC)) + if err != nil { + return nil, graph.Wrap(ctx, err, "getting page") + } + + // if graph.IsErrInvalidDelta(err) { + // logger.Ctx(ctx).Infow("Invalid previous delta link", "link", prevDelta) + + // invalidPrevDelta = true + // newPaths = map[string]string{} + + // pager.Reset() + + // continue + // } + + vals, err := pager.ValuesIn(page) + if err != nil { + return nil, graph.Wrap(ctx, err, "getting items in page") + } + + items = append(items, vals...) + + nextLink, _ := api.NextAndDeltaLink(page) + + // if len(deltaLink) > 0 { + // newDeltaURL = deltaLink + // } + + // Check if there are more items + if len(nextLink) == 0 { + break + } + + logger.Ctx(ctx).Debugw("found nextLink", "next_link", nextLink) + pager.SetNext(nextLink) + } + + return items, nil +} + +// Returns true if the container passes the scope comparison and should be included. +// Returns: +// - the path representing the directory as it should be stored in the repository. +// - the human-readable path using display names. +// - true if the path passes the scope comparison. +func includeContainer( + ctx context.Context, + qp graph.QueryParams, + gd graph.Displayable, + scope selectors.GroupsScope, + category path.CategoryType, +) bool { + // assume a single-level hierarchy + directory := ptr.Val(gd.GetDisplayName()) + + // TODO(keepers): awaiting parent branch to update to main + ok := scope.Matches(selectors.GroupsCategoryUnknown, directory) + + logger.Ctx(ctx).With( + "included", ok, + "scope", scope, + "match_target", directory, + ).Debug("backup folder selection filter") + + return ok +} diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go new file mode 100644 index 000000000..c1e6a4042 --- /dev/null +++ b/src/internal/m365/collection/groups/collection.go @@ -0,0 +1,180 @@ +package groups + +import ( + "bytes" + "context" + "io" + "time" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" +) + +var ( + _ data.BackupCollection = &Collection{} + _ data.Item = &Item{} + _ data.ItemInfo = &Item{} + _ data.ItemModTime = &Item{} +) + +const ( + collectionChannelBufferSize = 1000 + numberOfRetries = 4 +) + +type Collection struct { + protectedResource string + items chan data.Item + + // added is a list of existing item IDs that were added to a container + added map[string]struct{} + // removed is a list of item IDs that were deleted from, or moved out, of a container + removed map[string]struct{} + + // items itemGetterSerializer + + category path.CategoryType + statusUpdater support.StatusUpdater + ctrl control.Options + + // FullPath is the current hierarchical path used by this collection. + fullPath path.Path + + // PrevPath is the previous hierarchical path used by this collection. + // It may be the same as fullPath, if the folder was not renamed or + // moved. It will be empty on its first retrieval. + prevPath path.Path + + // LocationPath contains the path with human-readable display names. + // IE: "/Inbox/Important" instead of "/abcdxyz123/algha=lgkhal=t" + locationPath *path.Builder + + state data.CollectionState + + // doNotMergeItems should only be true if the old delta token expired. + // doNotMergeItems bool +} + +// NewExchangeDataCollection creates an ExchangeDataCollection. +// State of the collection is set as an observation of the current +// and previous paths. If the curr path is nil, the state is assumed +// to be deleted. If the prev path is nil, it is assumed newly created. +// If both are populated, then state is either moved (if they differ), +// or notMoved (if they match). +func NewCollection( + protectedResource string, + curr, prev path.Path, + location *path.Builder, + category path.CategoryType, + statusUpdater support.StatusUpdater, + ctrlOpts control.Options, + // doNotMergeItems bool, +) Collection { + collection := Collection{ + added: make(map[string]struct{}, 0), + category: category, + ctrl: ctrlOpts, + items: make(chan data.Item, collectionChannelBufferSize), + // doNotMergeItems: doNotMergeItems, + fullPath: curr, + locationPath: location, + prevPath: prev, + removed: make(map[string]struct{}, 0), + state: data.StateOf(prev, curr), + statusUpdater: statusUpdater, + protectedResource: protectedResource, + } + + return collection +} + +// Items utility function to asynchronously execute process to fill data channel with +// M365 exchange objects and returns the data channel +func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Item { + // go col.streamItems(ctx, errs) + return col.items +} + +// FullPath returns the Collection's fullPath []string +func (col *Collection) FullPath() path.Path { + return col.fullPath +} + +// LocationPath produces the Collection's full path, but with display names +// instead of IDs in the folders. Only populated for Calendars. +func (col *Collection) LocationPath() *path.Builder { + return col.locationPath +} + +// TODO(ashmrtn): Fill in with previous path once the Controller compares old +// and new folder hierarchies. +func (col Collection) PreviousPath() path.Path { + return col.prevPath +} + +func (col Collection) State() data.CollectionState { + return col.state +} + +func (col Collection) DoNotMergeItems() bool { + // TODO: depends on whether or not deltas are valid + return true +} + +// --------------------------------------------------------------------------- +// items +// --------------------------------------------------------------------------- + +// Item represents a single item retrieved from exchange +type Item struct { + id string + // TODO: We may need this to be a "oneOf" of `message`, `contact`, etc. + // going forward. Using []byte for now but I assume we'll have + // some structured type in here (serialization to []byte can be done in `Read`) + message []byte + info *details.ExchangeInfo // temporary change to bring populate function into directory + // TODO(ashmrtn): Can probably eventually be sourced from info as there's a + // request to provide modtime in ItemInfo structs. + modTime time.Time + + // true if the item was marked by graph as deleted. + deleted bool +} + +func (i *Item) ID() string { + return i.id +} + +func (i *Item) ToReader() io.ReadCloser { + return io.NopCloser(bytes.NewReader(i.message)) +} + +func (i Item) Deleted() bool { + return i.deleted +} + +func (i *Item) Info() details.ItemInfo { + return details.ItemInfo{Exchange: i.info} +} + +func (i *Item) ModTime() time.Time { + return i.modTime +} + +func NewItem( + identifier string, + dataBytes []byte, + detail details.ExchangeInfo, + modTime time.Time, +) Item { + return Item{ + id: identifier, + message: dataBytes, + info: &detail, + modTime: modTime, + } +} diff --git a/src/internal/m365/collection/groups/handler.go b/src/internal/m365/collection/groups/handler.go deleted file mode 100644 index d4a382149..000000000 --- a/src/internal/m365/collection/groups/handler.go +++ /dev/null @@ -1,18 +0,0 @@ -package groups - -import ( - "context" - - "github.com/microsoft/kiota-abstractions-go/serialization" - "github.com/microsoftgraph/msgraph-sdk-go/models" - - "github.com/alcionai/corso/src/pkg/services/m365/api" -) - -type BackupMessagesHandler interface { - GetMessageByID(ctx context.Context, teamID, channelID, itemID string) (models.ChatMessageable, error) - NewMessagePager(teamID, channelID string) api.MessageItemDeltaEnumerator - GetChannelByID(ctx context.Context, teamID, channelID string) (models.Channelable, error) - NewChannelPager(teamID, channelID string) api.ChannelItemDeltaEnumerator - GetReplyByID(ctx context.Context, teamID, channelID, messageID string) (serialization.Parsable, error) -} diff --git a/src/internal/m365/collection/groups/handlers.go b/src/internal/m365/collection/groups/handlers.go new file mode 100644 index 000000000..f5a28fd28 --- /dev/null +++ b/src/internal/m365/collection/groups/handlers.go @@ -0,0 +1,33 @@ +package groups + +import ( + "context" + + "github.com/microsoft/kiota-abstractions-go/serialization" + "github.com/microsoftgraph/msgraph-sdk-go/models" + + "github.com/alcionai/corso/src/pkg/services/m365/api" +) + +type BackupHandler interface { + GetChannelByID( + ctx context.Context, + teamID, channelID string, + ) (models.Channelable, error) + NewChannelsPager( + teamID string, + ) api.ChannelDeltaEnumerator + + GetMessageByID( + ctx context.Context, + teamID, channelID, itemID string, + ) (models.ChatMessageable, error) + NewMessagePager( + teamID, channelID string, + ) api.ChannelMessageDeltaEnumerator + + GetMessageReplies( + ctx context.Context, + teamID, channelID, messageID string, + ) (serialization.Parsable, error) +} diff --git a/src/pkg/services/m365/api/channels_pager.go b/src/pkg/services/m365/api/channels_pager.go index 599c09649..58aecaf6c 100644 --- a/src/pkg/services/m365/api/channels_pager.go +++ b/src/pkg/services/m365/api/channels_pager.go @@ -1,19 +1,21 @@ package api import ( - "context" + "github.com/microsoftgraph/msgraph-sdk-go/models" ) // --------------------------------------------------------------------------- // item pager // --------------------------------------------------------------------------- -type MessageItemDeltaEnumerator interface { - GetPage(context.Context) (DeltaPageLinker, error) +type ChannelMessageDeltaEnumerator interface { + DeltaGetPager + ValuesInPageLinker[models.ChatMessageable] + SetNextLinker } // TODO: implement -// var _ MessageItemDeltaEnumerator = &messagePageCtrl{} +// var _ ChannelMessageDeltaEnumerator = &messagePageCtrl{} // type messagePageCtrl struct { // gs graph.Servicer @@ -25,12 +27,14 @@ type MessageItemDeltaEnumerator interface { // channel pager // --------------------------------------------------------------------------- -type ChannelItemDeltaEnumerator interface { - GetPage(context.Context) (DeltaPageLinker, error) +type ChannelDeltaEnumerator interface { + DeltaGetPager + ValuesInPageLinker[models.Channelable] + SetNextLinker } // TODO: implement -// var _ ChannelsItemDeltaEnumerator = &channelsPageCtrl{} +// var _ ChannelDeltaEnumerator = &channelsPageCtrl{} // type channelsPageCtrl struct { // gs graph.Servicer diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index ef54b1a3d..4cb272d51 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -13,9 +13,18 @@ import ( ) // --------------------------------------------------------------------------- -// common interfaces and funcs +// common interfaces // --------------------------------------------------------------------------- +// TODO(keepers): replace all matching uses of GetPage with this. +type DeltaGetPager interface { + GetPage(context.Context) (DeltaPageLinker, error) +} + +type ValuesInPageLinker[T any] interface { + ValuesIn(PageLinker) ([]T, error) +} + type PageLinker interface { GetOdataNextLink() *string } @@ -25,6 +34,14 @@ type DeltaPageLinker interface { GetOdataDeltaLink() *string } +type SetNextLinker interface { + SetNext(nextLink string) +} + +// --------------------------------------------------------------------------- +// common funcs +// --------------------------------------------------------------------------- + // IsNextLinkValid separate check to investigate whether error is func IsNextLinkValid(next string) bool { return !strings.Contains(next, `users//`)