seems like a lot of code, but this is 95% boilerplate additions copied from groups collections packages, with a find-replace for names. Some noteworthy differences: * teamsChats does not handle metadata, so all metadata, delta, and previous path handling was removed * teamsChats does not produce tombstones * chats are never deleted, so no "removed" items are tracked * all chats gets stored at the prefix root, so no "containers" are iterated, and therefore only one collection is ever produced. This means that, overall, while the boilerplate here is still the same, it's much reduced compared to similar packages. --- #### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change - [x] 🌻 Feature #### Issue(s) * #5062 #### Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
538 lines
13 KiB
Go
538 lines
13 KiB
Go
package groups
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/alcionai/clues"
|
|
"github.com/kopia/kopia/repo/manifest"
|
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
|
|
|
"github.com/alcionai/corso/src/internal/common/idname"
|
|
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
|
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
|
"github.com/alcionai/corso/src/internal/data"
|
|
kinject "github.com/alcionai/corso/src/internal/kopia/inject"
|
|
"github.com/alcionai/corso/src/internal/m365/collection/drive"
|
|
"github.com/alcionai/corso/src/internal/m365/collection/groups"
|
|
"github.com/alcionai/corso/src/internal/m365/collection/site"
|
|
odConsts "github.com/alcionai/corso/src/internal/m365/service/onedrive/consts"
|
|
"github.com/alcionai/corso/src/internal/m365/support"
|
|
"github.com/alcionai/corso/src/internal/observe"
|
|
"github.com/alcionai/corso/src/internal/operations/inject"
|
|
"github.com/alcionai/corso/src/pkg/account"
|
|
"github.com/alcionai/corso/src/pkg/backup/identity"
|
|
"github.com/alcionai/corso/src/pkg/backup/metadata"
|
|
"github.com/alcionai/corso/src/pkg/control"
|
|
"github.com/alcionai/corso/src/pkg/count"
|
|
"github.com/alcionai/corso/src/pkg/fault"
|
|
"github.com/alcionai/corso/src/pkg/logger"
|
|
"github.com/alcionai/corso/src/pkg/path"
|
|
"github.com/alcionai/corso/src/pkg/selectors"
|
|
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
|
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
|
|
)
|
|
|
|
type groupsBackup struct{}
|
|
|
|
// NewBackup provides a struct that matches standard apis
|
|
// across m365/service handlers.
|
|
func NewBackup() *groupsBackup {
|
|
return &groupsBackup{}
|
|
}
|
|
|
|
func (groupsBackup) ProduceBackupCollections(
|
|
ctx context.Context,
|
|
bpc inject.BackupProducerConfig,
|
|
ac api.Client,
|
|
creds account.M365Config,
|
|
su support.StatusUpdater,
|
|
counter *count.Bus,
|
|
errs *fault.Bus,
|
|
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) {
|
|
b, err := bpc.Selector.ToGroupsBackup()
|
|
if err != nil {
|
|
return nil, nil, true, clues.Wrap(err, "groupsDataCollection: parsing selector")
|
|
}
|
|
|
|
var (
|
|
el = errs.Local()
|
|
collections = []data.BackupCollection{}
|
|
categories = map[path.CategoryType]struct{}{}
|
|
globalItemIDExclusions = prefixmatcher.NewStringSetBuilder()
|
|
sitesPreviousPaths = map[string]string{}
|
|
)
|
|
|
|
ctx = clues.Add(
|
|
ctx,
|
|
"group_id", clues.Hide(bpc.ProtectedResource.ID()),
|
|
"group_name", clues.Hide(bpc.ProtectedResource.Name()))
|
|
|
|
group, err := ac.Groups().GetByID(
|
|
ctx,
|
|
bpc.ProtectedResource.ID(),
|
|
api.CallConfig{})
|
|
if err != nil {
|
|
return nil, nil, true, clues.WrapWC(ctx, err, "getting group")
|
|
}
|
|
|
|
bc := backupCommon{ac, bpc, creds, group, sitesPreviousPaths, su}
|
|
|
|
for _, scope := range b.Scopes() {
|
|
if el.Failure() != nil {
|
|
break
|
|
}
|
|
|
|
cl := counter.Local()
|
|
ictx := clues.AddLabelCounter(ctx, cl.PlainAdder())
|
|
ictx = clues.Add(ictx, "category", scope.Category().PathType())
|
|
|
|
var colls []data.BackupCollection
|
|
|
|
switch scope.Category().PathType() {
|
|
case path.LibrariesCategory:
|
|
colls, err = backupLibraries(
|
|
ictx,
|
|
bc,
|
|
scope,
|
|
globalItemIDExclusions,
|
|
cl,
|
|
el)
|
|
case path.ChannelMessagesCategory:
|
|
colls, err = backupChannels(
|
|
ictx,
|
|
bc,
|
|
scope,
|
|
cl,
|
|
el)
|
|
case path.ConversationPostsCategory:
|
|
colls, err = backupConversations(
|
|
ictx,
|
|
bc,
|
|
scope,
|
|
cl,
|
|
el)
|
|
}
|
|
|
|
if err != nil {
|
|
el.AddRecoverable(ctx, clues.Stack(err))
|
|
continue
|
|
}
|
|
|
|
collections = append(collections, colls...)
|
|
|
|
categories[scope.Category().PathType()] = struct{}{}
|
|
}
|
|
|
|
if len(collections) > 0 {
|
|
baseCols, err := graph.BaseCollections(
|
|
ctx,
|
|
collections,
|
|
creds.AzureTenantID,
|
|
bpc.ProtectedResource.ID(),
|
|
path.GroupsService,
|
|
categories,
|
|
su,
|
|
counter,
|
|
errs)
|
|
if err != nil {
|
|
return nil, nil, true, err
|
|
}
|
|
|
|
collections = append(collections, baseCols...)
|
|
}
|
|
|
|
// Add metadata about sites
|
|
md, err := getSitesMetadataCollection(
|
|
creds.AzureTenantID,
|
|
bpc.ProtectedResource.ID(),
|
|
sitesPreviousPaths,
|
|
su,
|
|
counter)
|
|
if err != nil {
|
|
return nil, nil, true, err
|
|
}
|
|
|
|
collections = append(collections, md)
|
|
|
|
counter.Add(count.Collections, int64(len(collections)))
|
|
|
|
logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values())
|
|
|
|
return collections, globalItemIDExclusions.ToReader(), true, el.Failure()
|
|
}
|
|
|
|
type backupCommon struct {
|
|
apiCli api.Client
|
|
producerConfig inject.BackupProducerConfig
|
|
creds account.M365Config
|
|
group models.Groupable
|
|
sitesPreviousPaths map[string]string
|
|
statusUpdater support.StatusUpdater
|
|
}
|
|
|
|
func backupLibraries(
|
|
ctx context.Context,
|
|
bc backupCommon,
|
|
scope selectors.GroupsScope,
|
|
globalItemIDExclusions *prefixmatcher.StringSetMatchBuilder,
|
|
counter *count.Bus,
|
|
errs *fault.Bus,
|
|
) ([]data.BackupCollection, error) {
|
|
var (
|
|
colls = []data.BackupCollection{}
|
|
el = errs.Local()
|
|
)
|
|
|
|
sites, err := bc.apiCli.Groups().GetAllSites(
|
|
ctx,
|
|
bc.producerConfig.ProtectedResource.ID(),
|
|
errs)
|
|
if err != nil {
|
|
return nil, clues.Stack(err)
|
|
}
|
|
|
|
counter.Add(count.Sites, int64(len(sites)))
|
|
|
|
siteMetadataCollection := map[string][]data.RestoreCollection{}
|
|
|
|
// Once we have metadata collections for chat as well, we will have to filter those out
|
|
for _, c := range bc.producerConfig.MetadataCollections {
|
|
siteID := c.FullPath().Elements().Last()
|
|
siteMetadataCollection[siteID] = append(siteMetadataCollection[siteID], c)
|
|
}
|
|
|
|
for _, s := range sites {
|
|
if el.Failure() != nil {
|
|
break
|
|
}
|
|
|
|
var (
|
|
cl = counter.Local()
|
|
pr = idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl()))
|
|
sbpc = inject.BackupProducerConfig{
|
|
LastBackupVersion: bc.producerConfig.LastBackupVersion,
|
|
Options: bc.producerConfig.Options,
|
|
ProtectedResource: pr,
|
|
Selector: bc.producerConfig.Selector,
|
|
MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())],
|
|
}
|
|
bh = drive.NewGroupBackupHandler(
|
|
bc.producerConfig.ProtectedResource.ID(),
|
|
ptr.Val(s.GetId()),
|
|
bc.apiCli.Drives(),
|
|
scope)
|
|
)
|
|
|
|
ictx := clues.Add(
|
|
ctx,
|
|
"site_id", ptr.Val(s.GetId()),
|
|
"site_weburl", graph.LoggableURL(ptr.Val(s.GetWebUrl())))
|
|
|
|
sp, err := bh.SitePathPrefix(bc.creds.AzureTenantID)
|
|
if err != nil {
|
|
return nil, clues.WrapWC(ictx, err, "getting site path").Label(count.BadPathPrefix)
|
|
}
|
|
|
|
bc.sitesPreviousPaths[ptr.Val(s.GetId())] = sp.String()
|
|
|
|
cs, canUsePreviousBackup, err := site.CollectLibraries(
|
|
ictx,
|
|
sbpc,
|
|
bh,
|
|
bc.creds.AzureTenantID,
|
|
globalItemIDExclusions,
|
|
bc.statusUpdater,
|
|
cl,
|
|
errs)
|
|
if err != nil {
|
|
el.AddRecoverable(ictx, err)
|
|
continue
|
|
}
|
|
|
|
if !canUsePreviousBackup {
|
|
colls = append(colls, data.NewTombstoneCollection(sp, control.Options{}, cl))
|
|
}
|
|
|
|
colls = append(colls, cs...)
|
|
}
|
|
|
|
return colls, el.Failure()
|
|
}
|
|
|
|
func backupChannels(
|
|
ctx context.Context,
|
|
bc backupCommon,
|
|
scope selectors.GroupsScope,
|
|
counter *count.Bus,
|
|
errs *fault.Bus,
|
|
) ([]data.BackupCollection, error) {
|
|
var (
|
|
colls []data.BackupCollection
|
|
canUsePreviousBackup bool
|
|
)
|
|
|
|
progressMessage := observe.MessageWithCompletion(
|
|
ctx,
|
|
observe.ProgressCfg{
|
|
Indent: 1,
|
|
// TODO(meain): Use number of messages and not channels
|
|
CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(colls)) },
|
|
},
|
|
scope.Category().PathType().HumanString())
|
|
defer close(progressMessage)
|
|
|
|
if !api.IsTeam(ctx, bc.group) {
|
|
return colls, nil
|
|
}
|
|
|
|
bh := groups.NewChannelBackupHandler(
|
|
bc.producerConfig.ProtectedResource.ID(),
|
|
bc.apiCli.Channels())
|
|
|
|
// Always disable lazy reader for channels until #4321 support is added
|
|
useLazyReader := false
|
|
|
|
colls, canUsePreviousBackup, err := groups.CreateCollections(
|
|
ctx,
|
|
bc.producerConfig,
|
|
bh,
|
|
bc.creds.AzureTenantID,
|
|
scope,
|
|
bc.statusUpdater,
|
|
useLazyReader,
|
|
counter,
|
|
errs)
|
|
if err != nil {
|
|
return nil, clues.Stack(err)
|
|
}
|
|
|
|
if !canUsePreviousBackup {
|
|
tp, err := bh.PathPrefix(bc.creds.AzureTenantID)
|
|
if err != nil {
|
|
err = clues.WrapWC(ctx, err, "getting message path").Label(count.BadPathPrefix)
|
|
return nil, err
|
|
}
|
|
|
|
colls = append(colls, data.NewTombstoneCollection(tp, control.Options{}, counter))
|
|
}
|
|
|
|
return colls, nil
|
|
}
|
|
|
|
func backupConversations(
|
|
ctx context.Context,
|
|
bc backupCommon,
|
|
scope selectors.GroupsScope,
|
|
counter *count.Bus,
|
|
errs *fault.Bus,
|
|
) ([]data.BackupCollection, error) {
|
|
groupEmail := strings.Clone(ptr.Val(bc.group.GetMail()))
|
|
// This is unlikely, but if it does happen in the wild, we should investigate it.
|
|
if len(groupEmail) == 0 {
|
|
return nil, clues.New("group has no mail address")
|
|
}
|
|
|
|
var (
|
|
bh = groups.NewConversationBackupHandler(
|
|
bc.producerConfig.ProtectedResource.ID(),
|
|
bc.apiCli.Conversations(),
|
|
groupEmail)
|
|
colls []data.BackupCollection
|
|
)
|
|
|
|
progressMessage := observe.MessageWithCompletion(
|
|
ctx,
|
|
observe.ProgressCfg{
|
|
Indent: 1,
|
|
CompletionMessage: func() string { return fmt.Sprintf("(found %d conversations)", len(colls)) },
|
|
},
|
|
scope.Category().PathType().HumanString())
|
|
defer close(progressMessage)
|
|
|
|
useLazyReader := !bc.producerConfig.Options.ToggleFeatures.DisableLazyItemReader
|
|
|
|
colls, canUsePreviousBackup, err := groups.CreateCollections(
|
|
ctx,
|
|
bc.producerConfig,
|
|
bh,
|
|
bc.creds.AzureTenantID,
|
|
scope,
|
|
bc.statusUpdater,
|
|
useLazyReader,
|
|
counter,
|
|
errs)
|
|
if err != nil {
|
|
return nil, clues.Stack(err)
|
|
}
|
|
|
|
if !canUsePreviousBackup {
|
|
tp, err := bh.PathPrefix(bc.creds.AzureTenantID)
|
|
if err != nil {
|
|
err = clues.WrapWC(ctx, err, "getting conversation path").Label(count.BadPathPrefix)
|
|
return nil, err
|
|
}
|
|
|
|
colls = append(colls, data.NewTombstoneCollection(tp, control.Options{}, counter))
|
|
}
|
|
|
|
return colls, nil
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// metadata
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func getSitesMetadataCollection(
|
|
tenantID, groupID string,
|
|
sites map[string]string,
|
|
su support.StatusUpdater,
|
|
counter *count.Bus,
|
|
) (data.BackupCollection, error) {
|
|
p, err := path.BuildMetadata(
|
|
tenantID,
|
|
groupID,
|
|
path.GroupsService,
|
|
path.LibrariesCategory,
|
|
false)
|
|
if err != nil {
|
|
return nil, clues.Wrap(err, "making metadata path")
|
|
}
|
|
|
|
p, err = p.Append(false, odConsts.SitesPathDir)
|
|
if err != nil {
|
|
return nil, clues.Wrap(err, "appending sites to metadata path")
|
|
}
|
|
|
|
md, err := graph.MakeMetadataCollection(
|
|
p,
|
|
[]graph.MetadataCollectionEntry{
|
|
graph.NewMetadataEntry(metadata.PreviousPathFileName, sites),
|
|
},
|
|
su,
|
|
counter.Local())
|
|
|
|
return md, err
|
|
}
|
|
|
|
func MetadataFiles(
|
|
ctx context.Context,
|
|
reason identity.Reasoner,
|
|
r kinject.RestoreProducer,
|
|
manID manifest.ID,
|
|
errs *fault.Bus,
|
|
) ([][]string, error) {
|
|
pth, err := path.BuildMetadata(
|
|
reason.Tenant(),
|
|
reason.ProtectedResource(),
|
|
reason.Service(),
|
|
reason.Category(),
|
|
true,
|
|
odConsts.SitesPathDir,
|
|
metadata.PreviousPathFileName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dir, err := pth.Dir()
|
|
if err != nil {
|
|
return nil, clues.Wrap(err, "building metadata collection path")
|
|
}
|
|
|
|
dcs, err := r.ProduceRestoreCollections(
|
|
ctx,
|
|
string(manID),
|
|
[]path.RestorePaths{{StoragePath: pth, RestorePath: dir}},
|
|
nil,
|
|
errs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sites, err := deserializeSiteMetadata(ctx, dcs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
filePaths := [][]string{}
|
|
|
|
for k := range sites {
|
|
for _, fn := range metadata.AllMetadataFileNames() {
|
|
filePaths = append(filePaths, []string{odConsts.SitesPathDir, k, fn})
|
|
}
|
|
}
|
|
|
|
return filePaths, nil
|
|
}
|
|
|
|
func deserializeSiteMetadata(
|
|
ctx context.Context,
|
|
cols []data.RestoreCollection,
|
|
) (map[string]string, error) {
|
|
logger.Ctx(ctx).Infow(
|
|
"deserializing previous sites metadata",
|
|
"num_collections", len(cols))
|
|
|
|
var (
|
|
prevFolders = map[string]string{}
|
|
errs = fault.New(true) // metadata item reads should not fail backup
|
|
)
|
|
|
|
for _, col := range cols {
|
|
if errs.Failure() != nil {
|
|
break
|
|
}
|
|
|
|
items := col.Items(ctx, errs)
|
|
|
|
for breakLoop := false; !breakLoop; {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, clues.WrapWC(ctx, ctx.Err(), "deserializing previous sites metadata")
|
|
|
|
case item, ok := <-items:
|
|
if !ok {
|
|
breakLoop = true
|
|
break
|
|
}
|
|
|
|
var (
|
|
err error
|
|
ictx = clues.Add(ctx, "item_uuid", item.ID())
|
|
)
|
|
|
|
switch item.ID() {
|
|
case metadata.PreviousPathFileName:
|
|
err = drive.DeserializeMap(item.ToReader(), prevFolders)
|
|
|
|
default:
|
|
logger.Ctx(ictx).Infow(
|
|
"skipping unknown metadata file",
|
|
"file_name", item.ID())
|
|
|
|
continue
|
|
}
|
|
|
|
if err == nil {
|
|
// Successful decode.
|
|
continue
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, clues.StackWC(ictx, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// if reads from items failed, return empty but no error
|
|
if errs.Failure() != nil {
|
|
logger.CtxErr(ctx, errs.Failure()).Info("reading metadata collection items")
|
|
|
|
return map[string]string{}, nil
|
|
}
|
|
|
|
return prevFolders, nil
|
|
}
|