diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dcfcdb0f..0c043956e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] (beta) ### Added + ### Changed - Memory optimizations for large scale OneDrive and Sharepoint backups. + ### Fixed +- Resolved a possible deadlock when backing up Teams Channel Messages. ## [v0.16.0] (beta) - 2023-11-28 diff --git a/src/internal/m365/service/groups/backup.go b/src/internal/m365/service/groups/backup.go index 26401cdd4..72b431d15 100644 --- a/src/internal/m365/service/groups/backup.go +++ b/src/internal/m365/service/groups/backup.go @@ -6,6 +6,7 @@ import ( "github.com/alcionai/clues" "github.com/kopia/kopia/repo/manifest" + "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/prefixmatcher" @@ -27,6 +28,7 @@ import ( "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api/graph" ) @@ -46,11 +48,11 @@ func ProduceBackupCollections( } var ( - el = errs.Local() - collections = []data.BackupCollection{} - categories = map[path.CategoryType]struct{}{} - ssmb = prefixmatcher.NewStringSetBuilder() - sitesPreviousPaths = map[string]string{} + el = errs.Local() + collections = []data.BackupCollection{} + categories = map[path.CategoryType]struct{}{} + globalItemIDExclusions = prefixmatcher.NewStringSetBuilder() + sitesPreviousPaths = map[string]string{} ) ctx = clues.Add( @@ -66,6 +68,8 @@ func ProduceBackupCollections( return nil, nil, clues.WrapWC(ctx, err, "getting group") } + bc := backupCommon{ac, bpc, creds, group, sitesPreviousPaths, su} + for _, scope := range b.Scopes() { if el.Failure() != nil { break @@ -75,166 +79,39 @@ func ProduceBackupCollections( ictx := clues.AddLabelCounter(ctx, cl.PlainAdder()) ictx = clues.Add(ictx, "category", scope.Category().PathType()) - var dbcs []data.BackupCollection + var colls []data.BackupCollection switch scope.Category().PathType() { case path.LibrariesCategory: - sites, err := ac.Groups().GetAllSites( + colls, err = backupLibraries( ictx, - bpc.ProtectedResource.ID(), - errs) - if err != nil { - return nil, nil, err - } - - cl.Add(count.Sites, int64(len(sites))) - - siteMetadataCollection := map[string][]data.RestoreCollection{} - - // Once we have metadata collections for chat as well, we will have to filter those out - for _, c := range bpc.MetadataCollections { - siteID := c.FullPath().Elements().Last() - siteMetadataCollection[siteID] = append(siteMetadataCollection[siteID], c) - } - - for _, s := range sites { - var ( - scl = cl.Local() - pr = idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl())) - sbpc = inject.BackupProducerConfig{ - LastBackupVersion: bpc.LastBackupVersion, - Options: bpc.Options, - ProtectedResource: pr, - Selector: bpc.Selector, - MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())], - } - bh = drive.NewGroupBackupHandler( - bpc.ProtectedResource.ID(), - ptr.Val(s.GetId()), - ac.Drives(), - scope) - ) - - ictx = clues.Add( - ictx, - "site_id", ptr.Val(s.GetId()), - "site_weburl", graph.LoggableURL(ptr.Val(s.GetWebUrl()))) - - sp, err := bh.SitePathPrefix(creds.AzureTenantID) - if err != nil { - return nil, nil, clues.WrapWC(ictx, err, "getting site path").Label(count.BadPathPrefix) - } - - sitesPreviousPaths[ptr.Val(s.GetId())] = sp.String() - - cs, canUsePreviousBackup, err := site.CollectLibraries( - ictx, - sbpc, - bh, - creds.AzureTenantID, - ssmb, - su, - scl, - errs) - if err != nil { - el.AddRecoverable(ictx, err) - continue - } - - if !canUsePreviousBackup { - dbcs = append(dbcs, data.NewTombstoneCollection(sp, control.Options{}, scl)) - } - - dbcs = append(dbcs, cs...) - } - case path.ChannelMessagesCategory: - var ( - cs []data.BackupCollection - canUsePreviousBackup bool - err error - ) - - pcfg := observe.ProgressCfg{ - Indent: 1, - // TODO(meain): Use number of messages and not channels - CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(cs)) }, - } - progressBar := observe.MessageWithCompletion(ictx, pcfg, scope.Category().PathType().HumanString()) - - if !api.IsTeam(ictx, group) { - continue - } - - bh := groups.NewChannelBackupHandler(bpc.ProtectedResource.ID(), ac.Channels()) - - cs, canUsePreviousBackup, err = groups.CreateCollections( - ictx, - bpc, - bh, - creds.AzureTenantID, + bc, scope, - su, + globalItemIDExclusions, cl, - errs) - if err != nil { - el.AddRecoverable(ictx, err) - continue - } - - if !canUsePreviousBackup { - tp, err := bh.PathPrefix(creds.AzureTenantID) - if err != nil { - return nil, nil, clues.WrapWC(ictx, err, "getting message path").Label(count.BadPathPrefix) - } - - dbcs = append(dbcs, data.NewTombstoneCollection(tp, control.Options{}, cl)) - } - - dbcs = append(dbcs, cs...) - - close(progressBar) - case path.ConversationPostsCategory: - var ( - bh = groups.NewConversationBackupHandler(bpc.ProtectedResource.ID(), ac.Conversations()) - cs []data.BackupCollection - err error - ) - - pcfg := observe.ProgressCfg{ - Indent: 1, - CompletionMessage: func() string { return fmt.Sprintf("(found %d conversations)", len(cs)) }, - } - progressBar := observe.MessageWithCompletion(ictx, pcfg, scope.Category().PathType().HumanString()) - - cs, canUsePreviousBackup, err := groups.CreateCollections( + el) + case path.ChannelMessagesCategory: + colls, err = backupChannels( ictx, - bpc, - bh, - creds.AzureTenantID, + bc, scope, - su, - counter, - errs) - if err != nil { - el.AddRecoverable(ictx, err) - continue - } - - if !canUsePreviousBackup { - tp, err := bh.PathPrefix(creds.AzureTenantID) - if err != nil { - return nil, nil, clues.Wrap(err, "getting conversations path") - } - - dbcs = append(dbcs, data.NewTombstoneCollection(tp, control.Options{}, counter)) - } - - dbcs = append(dbcs, cs...) - - close(progressBar) + cl, + el) + case path.ConversationPostsCategory: + colls, err = backupConversations( + ictx, + bc, + scope, + cl, + el) } - collections = append(collections, dbcs...) + if err != nil { + el.AddRecoverable(ctx, clues.Stack(err)) + continue + } + + collections = append(collections, colls...) categories[scope.Category().PathType()] = struct{}{} } @@ -274,7 +151,208 @@ func ProduceBackupCollections( logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values()) - return collections, ssmb.ToReader(), el.Failure() + return collections, globalItemIDExclusions.ToReader(), el.Failure() +} + +type backupCommon struct { + apiCli api.Client + producerConfig inject.BackupProducerConfig + creds account.M365Config + group models.Groupable + sitesPreviousPaths map[string]string + statusUpdater support.StatusUpdater +} + +func backupLibraries( + ctx context.Context, + bc backupCommon, + scope selectors.GroupsScope, + globalItemIDExclusions *prefixmatcher.StringSetMatchBuilder, + counter *count.Bus, + errs *fault.Bus, +) ([]data.BackupCollection, error) { + var ( + colls = []data.BackupCollection{} + el = errs.Local() + ) + + sites, err := bc.apiCli.Groups().GetAllSites( + ctx, + bc.producerConfig.ProtectedResource.ID(), + errs) + if err != nil { + return nil, clues.Stack(err) + } + + counter.Add(count.Sites, int64(len(sites))) + + siteMetadataCollection := map[string][]data.RestoreCollection{} + + // Once we have metadata collections for chat as well, we will have to filter those out + for _, c := range bc.producerConfig.MetadataCollections { + siteID := c.FullPath().Elements().Last() + siteMetadataCollection[siteID] = append(siteMetadataCollection[siteID], c) + } + + for _, s := range sites { + if el.Failure() != nil { + break + } + + var ( + cl = counter.Local() + pr = idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl())) + sbpc = inject.BackupProducerConfig{ + LastBackupVersion: bc.producerConfig.LastBackupVersion, + Options: bc.producerConfig.Options, + ProtectedResource: pr, + Selector: bc.producerConfig.Selector, + MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())], + } + bh = drive.NewGroupBackupHandler( + bc.producerConfig.ProtectedResource.ID(), + ptr.Val(s.GetId()), + bc.apiCli.Drives(), + scope) + ) + + ictx := clues.Add( + ctx, + "site_id", ptr.Val(s.GetId()), + "site_weburl", graph.LoggableURL(ptr.Val(s.GetWebUrl()))) + + sp, err := bh.SitePathPrefix(bc.creds.AzureTenantID) + if err != nil { + return nil, clues.WrapWC(ictx, err, "getting site path").Label(count.BadPathPrefix) + } + + bc.sitesPreviousPaths[ptr.Val(s.GetId())] = sp.String() + + cs, canUsePreviousBackup, err := site.CollectLibraries( + ictx, + sbpc, + bh, + bc.creds.AzureTenantID, + globalItemIDExclusions, + bc.statusUpdater, + cl, + errs) + if err != nil { + el.AddRecoverable(ictx, err) + continue + } + + if !canUsePreviousBackup { + colls = append(colls, data.NewTombstoneCollection(sp, control.Options{}, cl)) + } + + colls = append(colls, cs...) + } + + return colls, el.Failure() +} + +func backupChannels( + ctx context.Context, + bc backupCommon, + scope selectors.GroupsScope, + counter *count.Bus, + errs *fault.Bus, +) ([]data.BackupCollection, error) { + var ( + colls []data.BackupCollection + canUsePreviousBackup bool + ) + + pcfg := observe.ProgressCfg{ + Indent: 1, + // TODO(meain): Use number of messages and not channels + CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(colls)) }, + } + progressBar := observe.MessageWithCompletion(ctx, pcfg, scope.Category().PathType().HumanString()) + + defer close(progressBar) + + if !api.IsTeam(ctx, bc.group) { + return colls, nil + } + + bh := groups.NewChannelBackupHandler( + bc.producerConfig.ProtectedResource.ID(), + bc.apiCli.Channels()) + + colls, canUsePreviousBackup, err := groups.CreateCollections( + ctx, + bc.producerConfig, + bh, + bc.creds.AzureTenantID, + scope, + bc.statusUpdater, + counter, + errs) + if err != nil { + return nil, clues.Stack(err) + } + + if !canUsePreviousBackup { + tp, err := bh.PathPrefix(bc.creds.AzureTenantID) + if err != nil { + err = clues.WrapWC(ctx, err, "getting message path").Label(count.BadPathPrefix) + return nil, err + } + + colls = append(colls, data.NewTombstoneCollection(tp, control.Options{}, counter)) + } + + return colls, nil +} + +func backupConversations( + ctx context.Context, + bc backupCommon, + scope selectors.GroupsScope, + counter *count.Bus, + errs *fault.Bus, +) ([]data.BackupCollection, error) { + var ( + bh = groups.NewConversationBackupHandler( + bc.producerConfig.ProtectedResource.ID(), + bc.apiCli.Conversations()) + colls []data.BackupCollection + ) + + pcfg := observe.ProgressCfg{ + Indent: 1, + CompletionMessage: func() string { return fmt.Sprintf("(found %d conversations)", len(colls)) }, + } + progressBar := observe.MessageWithCompletion(ctx, pcfg, scope.Category().PathType().HumanString()) + + defer close(progressBar) + + colls, canUsePreviousBackup, err := groups.CreateCollections( + ctx, + bc.producerConfig, + bh, + bc.creds.AzureTenantID, + scope, + bc.statusUpdater, + counter, + errs) + if err != nil { + return nil, clues.Stack(err) + } + + if !canUsePreviousBackup { + tp, err := bh.PathPrefix(bc.creds.AzureTenantID) + if err != nil { + err = clues.WrapWC(ctx, err, "getting conversation path").Label(count.BadPathPrefix) + return nil, err + } + + colls = append(colls, data.NewTombstoneCollection(tp, control.Options{}, counter)) + } + + return colls, nil } // ---------------------------------------------------------------------------