fix messages backup deadlock (#4809)

Moving scope backup code to their own functions allows us to defer closing the progress bar for the scope, rather than closing it on a specific line of code, which might get missed and cause a lock.

---

#### Does this PR need a docs update or release note?

- [x]  Yes, it's included

#### Type of change

- [x] 🐛 Bugfix

#### Test Plan

- [x] 💪 Manual
This commit is contained in:
Keepers 2023-12-08 10:33:17 -07:00 committed by GitHub
parent 54ba241fbe
commit 17c1bbda19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 237 additions and 156 deletions

View File

@ -8,9 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] (beta)
### Added
### Changed
- Memory optimizations for large scale OneDrive and Sharepoint backups.
### Fixed
- Resolved a possible deadlock when backing up Teams Channel Messages.
## [v0.16.0] (beta) - 2023-11-28

View File

@ -6,6 +6,7 @@ import (
"github.com/alcionai/clues"
"github.com/kopia/kopia/repo/manifest"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
@ -27,6 +28,7 @@ import (
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
@ -46,11 +48,11 @@ func ProduceBackupCollections(
}
var (
el = errs.Local()
collections = []data.BackupCollection{}
categories = map[path.CategoryType]struct{}{}
ssmb = prefixmatcher.NewStringSetBuilder()
sitesPreviousPaths = map[string]string{}
el = errs.Local()
collections = []data.BackupCollection{}
categories = map[path.CategoryType]struct{}{}
globalItemIDExclusions = prefixmatcher.NewStringSetBuilder()
sitesPreviousPaths = map[string]string{}
)
ctx = clues.Add(
@ -66,6 +68,8 @@ func ProduceBackupCollections(
return nil, nil, clues.WrapWC(ctx, err, "getting group")
}
bc := backupCommon{ac, bpc, creds, group, sitesPreviousPaths, su}
for _, scope := range b.Scopes() {
if el.Failure() != nil {
break
@ -75,166 +79,39 @@ func ProduceBackupCollections(
ictx := clues.AddLabelCounter(ctx, cl.PlainAdder())
ictx = clues.Add(ictx, "category", scope.Category().PathType())
var dbcs []data.BackupCollection
var colls []data.BackupCollection
switch scope.Category().PathType() {
case path.LibrariesCategory:
sites, err := ac.Groups().GetAllSites(
colls, err = backupLibraries(
ictx,
bpc.ProtectedResource.ID(),
errs)
if err != nil {
return nil, nil, err
}
cl.Add(count.Sites, int64(len(sites)))
siteMetadataCollection := map[string][]data.RestoreCollection{}
// Once we have metadata collections for chat as well, we will have to filter those out
for _, c := range bpc.MetadataCollections {
siteID := c.FullPath().Elements().Last()
siteMetadataCollection[siteID] = append(siteMetadataCollection[siteID], c)
}
for _, s := range sites {
var (
scl = cl.Local()
pr = idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl()))
sbpc = inject.BackupProducerConfig{
LastBackupVersion: bpc.LastBackupVersion,
Options: bpc.Options,
ProtectedResource: pr,
Selector: bpc.Selector,
MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())],
}
bh = drive.NewGroupBackupHandler(
bpc.ProtectedResource.ID(),
ptr.Val(s.GetId()),
ac.Drives(),
scope)
)
ictx = clues.Add(
ictx,
"site_id", ptr.Val(s.GetId()),
"site_weburl", graph.LoggableURL(ptr.Val(s.GetWebUrl())))
sp, err := bh.SitePathPrefix(creds.AzureTenantID)
if err != nil {
return nil, nil, clues.WrapWC(ictx, err, "getting site path").Label(count.BadPathPrefix)
}
sitesPreviousPaths[ptr.Val(s.GetId())] = sp.String()
cs, canUsePreviousBackup, err := site.CollectLibraries(
ictx,
sbpc,
bh,
creds.AzureTenantID,
ssmb,
su,
scl,
errs)
if err != nil {
el.AddRecoverable(ictx, err)
continue
}
if !canUsePreviousBackup {
dbcs = append(dbcs, data.NewTombstoneCollection(sp, control.Options{}, scl))
}
dbcs = append(dbcs, cs...)
}
case path.ChannelMessagesCategory:
var (
cs []data.BackupCollection
canUsePreviousBackup bool
err error
)
pcfg := observe.ProgressCfg{
Indent: 1,
// TODO(meain): Use number of messages and not channels
CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(cs)) },
}
progressBar := observe.MessageWithCompletion(ictx, pcfg, scope.Category().PathType().HumanString())
if !api.IsTeam(ictx, group) {
continue
}
bh := groups.NewChannelBackupHandler(bpc.ProtectedResource.ID(), ac.Channels())
cs, canUsePreviousBackup, err = groups.CreateCollections(
ictx,
bpc,
bh,
creds.AzureTenantID,
bc,
scope,
su,
globalItemIDExclusions,
cl,
errs)
if err != nil {
el.AddRecoverable(ictx, err)
continue
}
if !canUsePreviousBackup {
tp, err := bh.PathPrefix(creds.AzureTenantID)
if err != nil {
return nil, nil, clues.WrapWC(ictx, err, "getting message path").Label(count.BadPathPrefix)
}
dbcs = append(dbcs, data.NewTombstoneCollection(tp, control.Options{}, cl))
}
dbcs = append(dbcs, cs...)
close(progressBar)
case path.ConversationPostsCategory:
var (
bh = groups.NewConversationBackupHandler(bpc.ProtectedResource.ID(), ac.Conversations())
cs []data.BackupCollection
err error
)
pcfg := observe.ProgressCfg{
Indent: 1,
CompletionMessage: func() string { return fmt.Sprintf("(found %d conversations)", len(cs)) },
}
progressBar := observe.MessageWithCompletion(ictx, pcfg, scope.Category().PathType().HumanString())
cs, canUsePreviousBackup, err := groups.CreateCollections(
el)
case path.ChannelMessagesCategory:
colls, err = backupChannels(
ictx,
bpc,
bh,
creds.AzureTenantID,
bc,
scope,
su,
counter,
errs)
if err != nil {
el.AddRecoverable(ictx, err)
continue
}
if !canUsePreviousBackup {
tp, err := bh.PathPrefix(creds.AzureTenantID)
if err != nil {
return nil, nil, clues.Wrap(err, "getting conversations path")
}
dbcs = append(dbcs, data.NewTombstoneCollection(tp, control.Options{}, counter))
}
dbcs = append(dbcs, cs...)
close(progressBar)
cl,
el)
case path.ConversationPostsCategory:
colls, err = backupConversations(
ictx,
bc,
scope,
cl,
el)
}
collections = append(collections, dbcs...)
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err))
continue
}
collections = append(collections, colls...)
categories[scope.Category().PathType()] = struct{}{}
}
@ -274,7 +151,208 @@ func ProduceBackupCollections(
logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values())
return collections, ssmb.ToReader(), el.Failure()
return collections, globalItemIDExclusions.ToReader(), el.Failure()
}
type backupCommon struct {
apiCli api.Client
producerConfig inject.BackupProducerConfig
creds account.M365Config
group models.Groupable
sitesPreviousPaths map[string]string
statusUpdater support.StatusUpdater
}
func backupLibraries(
ctx context.Context,
bc backupCommon,
scope selectors.GroupsScope,
globalItemIDExclusions *prefixmatcher.StringSetMatchBuilder,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
var (
colls = []data.BackupCollection{}
el = errs.Local()
)
sites, err := bc.apiCli.Groups().GetAllSites(
ctx,
bc.producerConfig.ProtectedResource.ID(),
errs)
if err != nil {
return nil, clues.Stack(err)
}
counter.Add(count.Sites, int64(len(sites)))
siteMetadataCollection := map[string][]data.RestoreCollection{}
// Once we have metadata collections for chat as well, we will have to filter those out
for _, c := range bc.producerConfig.MetadataCollections {
siteID := c.FullPath().Elements().Last()
siteMetadataCollection[siteID] = append(siteMetadataCollection[siteID], c)
}
for _, s := range sites {
if el.Failure() != nil {
break
}
var (
cl = counter.Local()
pr = idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl()))
sbpc = inject.BackupProducerConfig{
LastBackupVersion: bc.producerConfig.LastBackupVersion,
Options: bc.producerConfig.Options,
ProtectedResource: pr,
Selector: bc.producerConfig.Selector,
MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())],
}
bh = drive.NewGroupBackupHandler(
bc.producerConfig.ProtectedResource.ID(),
ptr.Val(s.GetId()),
bc.apiCli.Drives(),
scope)
)
ictx := clues.Add(
ctx,
"site_id", ptr.Val(s.GetId()),
"site_weburl", graph.LoggableURL(ptr.Val(s.GetWebUrl())))
sp, err := bh.SitePathPrefix(bc.creds.AzureTenantID)
if err != nil {
return nil, clues.WrapWC(ictx, err, "getting site path").Label(count.BadPathPrefix)
}
bc.sitesPreviousPaths[ptr.Val(s.GetId())] = sp.String()
cs, canUsePreviousBackup, err := site.CollectLibraries(
ictx,
sbpc,
bh,
bc.creds.AzureTenantID,
globalItemIDExclusions,
bc.statusUpdater,
cl,
errs)
if err != nil {
el.AddRecoverable(ictx, err)
continue
}
if !canUsePreviousBackup {
colls = append(colls, data.NewTombstoneCollection(sp, control.Options{}, cl))
}
colls = append(colls, cs...)
}
return colls, el.Failure()
}
func backupChannels(
ctx context.Context,
bc backupCommon,
scope selectors.GroupsScope,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
var (
colls []data.BackupCollection
canUsePreviousBackup bool
)
pcfg := observe.ProgressCfg{
Indent: 1,
// TODO(meain): Use number of messages and not channels
CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(colls)) },
}
progressBar := observe.MessageWithCompletion(ctx, pcfg, scope.Category().PathType().HumanString())
defer close(progressBar)
if !api.IsTeam(ctx, bc.group) {
return colls, nil
}
bh := groups.NewChannelBackupHandler(
bc.producerConfig.ProtectedResource.ID(),
bc.apiCli.Channels())
colls, canUsePreviousBackup, err := groups.CreateCollections(
ctx,
bc.producerConfig,
bh,
bc.creds.AzureTenantID,
scope,
bc.statusUpdater,
counter,
errs)
if err != nil {
return nil, clues.Stack(err)
}
if !canUsePreviousBackup {
tp, err := bh.PathPrefix(bc.creds.AzureTenantID)
if err != nil {
err = clues.WrapWC(ctx, err, "getting message path").Label(count.BadPathPrefix)
return nil, err
}
colls = append(colls, data.NewTombstoneCollection(tp, control.Options{}, counter))
}
return colls, nil
}
func backupConversations(
ctx context.Context,
bc backupCommon,
scope selectors.GroupsScope,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
var (
bh = groups.NewConversationBackupHandler(
bc.producerConfig.ProtectedResource.ID(),
bc.apiCli.Conversations())
colls []data.BackupCollection
)
pcfg := observe.ProgressCfg{
Indent: 1,
CompletionMessage: func() string { return fmt.Sprintf("(found %d conversations)", len(colls)) },
}
progressBar := observe.MessageWithCompletion(ctx, pcfg, scope.Category().PathType().HumanString())
defer close(progressBar)
colls, canUsePreviousBackup, err := groups.CreateCollections(
ctx,
bc.producerConfig,
bh,
bc.creds.AzureTenantID,
scope,
bc.statusUpdater,
counter,
errs)
if err != nil {
return nil, clues.Stack(err)
}
if !canUsePreviousBackup {
tp, err := bh.PathPrefix(bc.creds.AzureTenantID)
if err != nil {
err = clues.WrapWC(ctx, err, "getting conversation path").Label(count.BadPathPrefix)
return nil, err
}
colls = append(colls, data.NewTombstoneCollection(tp, control.Options{}, counter))
}
return colls, nil
}
// ---------------------------------------------------------------------------