introduce backupProducerConfig (#3904)

Adds another inject container-of-things to hold
common properties used by backup collection producers.

No logic changes, just code movement, renames, and placing things into structs.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🧹 Tech Debt/Cleanup

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-08-01 18:38:19 -06:00 committed by GitHub
parent 0f9d637074
commit fc6119064b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 668 additions and 635 deletions

View File

@ -5,7 +5,6 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
@ -13,7 +12,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/onedrive"
"github.com/alcionai/corso/src/internal/m365/sharepoint"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/filters"
"github.com/alcionai/corso/src/pkg/logger"
@ -33,26 +32,24 @@ import (
// prior history (ie, incrementals) and run a full backup.
func (ctrl *Controller) ProduceBackupCollections(
ctx context.Context,
owner idname.Provider,
sels selectors.Selector,
metadata []data.RestoreCollection,
lastBackupVersion int,
ctrlOpts control.Options,
bpc inject.BackupProducerConfig,
errs *fault.Bus,
) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) {
service := bpc.Selector.PathService()
ctx, end := diagnostics.Span(
ctx,
"m365:produceBackupCollections",
diagnostics.Index("service", sels.PathService().String()))
diagnostics.Index("service", bpc.Selector.PathService().String()))
defer end()
ctx = graph.BindRateLimiterConfig(ctx, graph.LimiterCfg{Service: sels.PathService()})
ctx = graph.BindRateLimiterConfig(ctx, graph.LimiterCfg{Service: service})
// Limit the max number of active requests to graph from this collection.
ctrlOpts.Parallelism.ItemFetch = graph.Parallelism(sels.PathService()).
ItemOverride(ctx, ctrlOpts.Parallelism.ItemFetch)
bpc.Options.Parallelism.ItemFetch = graph.Parallelism(service).
ItemOverride(ctx, bpc.Options.Parallelism.ItemFetch)
err := verifyBackupInputs(sels, ctrl.IDNameLookup.IDs())
err := verifyBackupInputs(bpc.Selector, ctrl.IDNameLookup.IDs())
if err != nil {
return nil, nil, false, clues.Stack(err).WithClues(ctx)
}
@ -60,8 +57,8 @@ func (ctrl *Controller) ProduceBackupCollections(
serviceEnabled, canMakeDeltaQueries, err := checkServiceEnabled(
ctx,
ctrl.AC.Users(),
sels.PathService(),
owner.ID())
service,
bpc.ProtectedResource.ID())
if err != nil {
return nil, nil, false, err
}
@ -79,58 +76,48 @@ func (ctrl *Controller) ProduceBackupCollections(
if !canMakeDeltaQueries {
logger.Ctx(ctx).Info("delta requests not available")
ctrlOpts.ToggleFeatures.DisableDelta = true
bpc.Options.ToggleFeatures.DisableDelta = true
}
switch sels.Service {
case selectors.ServiceExchange:
switch service {
case path.ExchangeService:
colls, ssmb, canUsePreviousBackup, err = exchange.ProduceBackupCollections(
ctx,
bpc,
ctrl.AC,
sels,
ctrl.credentials.AzureTenantID,
owner,
metadata,
ctrl.UpdateStatus,
ctrlOpts,
errs)
if err != nil {
return nil, nil, false, err
}
case selectors.ServiceOneDrive:
case path.OneDriveService:
colls, ssmb, canUsePreviousBackup, err = onedrive.ProduceBackupCollections(
ctx,
bpc,
ctrl.AC,
sels,
owner,
metadata,
lastBackupVersion,
ctrl.credentials.AzureTenantID,
ctrl.UpdateStatus,
ctrlOpts,
errs)
if err != nil {
return nil, nil, false, err
}
case selectors.ServiceSharePoint:
case path.SharePointService:
colls, ssmb, canUsePreviousBackup, err = sharepoint.ProduceBackupCollections(
ctx,
bpc,
ctrl.AC,
sels,
owner,
metadata,
ctrl.credentials,
ctrl,
ctrlOpts,
errs)
if err != nil {
return nil, nil, false, err
}
default:
return nil, nil, false, clues.Wrap(clues.New(sels.Service.String()), "service not supported").WithClues(ctx)
return nil, nil, false, clues.Wrap(clues.New(service.String()), "service not supported").WithClues(ctx)
}
for _, c := range colls {

View File

@ -14,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/exchange"
"github.com/alcionai/corso/src/internal/m365/resource"
"github.com/alcionai/corso/src/internal/m365/sharepoint"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/internal/version"
@ -123,15 +124,20 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
ctrlOpts := control.DefaultOptions()
ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries
bpc := inject.BackupProducerConfig{
// exchange doesn't have any changes based on backup version yet.
LastBackupVersion: version.NoBackup,
Options: ctrlOpts,
ProtectedResource: uidn,
Selector: sel,
}
collections, excludes, canUsePreviousBackup, err := exchange.ProduceBackupCollections(
ctx,
bpc,
suite.ac,
sel,
suite.tenantID,
uidn,
nil,
ctrl.UpdateStatus,
ctrlOpts,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -233,13 +239,15 @@ func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner()
ctx, flush := tester.NewContext(t)
defer flush()
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: test.getSelector(t),
}
collections, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections(
ctx,
test.getSelector(t),
test.getSelector(t),
nil,
version.NoBackup,
control.DefaultOptions(),
bpc,
fault.New(true))
assert.Error(t, err, clues.ToCore(err))
assert.False(t, canUsePreviousBackup, "can use previous backup")
@ -288,15 +296,18 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() {
sel := test.getSelector()
bpc := inject.BackupProducerConfig{
Options: control.DefaultOptions(),
ProtectedResource: sel,
Selector: sel,
}
collections, excludes, canUsePreviousBackup, err := sharepoint.ProduceBackupCollections(
ctx,
bpc,
suite.ac,
sel,
sel,
nil,
ctrl.credentials,
ctrl,
control.DefaultOptions(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -375,13 +386,16 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Libraries() {
sel.SetDiscreteOwnerIDName(id, name)
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: inMock.NewProvider(id, name),
Selector: sel.Selector,
}
cols, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections(
ctx,
inMock.NewProvider(id, name),
sel.Selector,
nil,
version.NoBackup,
control.DefaultOptions(),
bpc,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -422,13 +436,16 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Lists() {
sel.SetDiscreteOwnerIDName(id, name)
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: inMock.NewProvider(id, name),
Selector: sel.Selector,
}
cols, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections(
ctx,
inMock.NewProvider(id, name),
sel.Selector,
nil,
version.NoBackup,
control.DefaultOptions(),
bpc,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")

View File

@ -587,14 +587,17 @@ func runBackupAndCompare(
backupSel := backupSelectorForExpected(t, sci.Service, expectedDests)
t.Logf("Selective backup of %s\n", backupSel)
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: sci.Opts,
ProtectedResource: backupSel,
Selector: backupSel,
}
start := time.Now()
dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections(
ctx,
backupSel,
backupSel,
nil,
version.NoBackup,
sci.Opts,
bpc,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -1168,13 +1171,16 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() {
backupSel := backupSelectorForExpected(t, test.service, expectedDests)
t.Log("Selective backup of", backupSel)
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: backupSel,
Selector: backupSel,
}
dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections(
ctx,
backupSel,
backupSel,
nil,
version.NoBackup,
control.DefaultOptions(),
bpc,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -1323,13 +1329,16 @@ func (suite *ControllerIntegrationSuite) TestBackup_CreatesPrefixCollections() {
backupSel.SetDiscreteOwnerIDName(id, name)
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: inMock.NewProvider(id, name),
Selector: backupSel,
}
dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections(
ctx,
idname.NewProvider(id, name),
backupSel,
nil,
version.NoBackup,
control.DefaultOptions(),
bpc,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")

View File

@ -6,7 +6,6 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/pii"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/ptr"
@ -14,6 +13,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
@ -22,6 +22,451 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
// ProduceBackupCollections returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
func ProduceBackupCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
ac api.Client,
tenantID string,
su support.StatusUpdater,
errs *fault.Bus,
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) {
eb, err := bpc.Selector.ToExchangeBackup()
if err != nil {
return nil, nil, false, clues.Wrap(err, "exchange dataCollection selector").WithClues(ctx)
}
var (
collections = []data.BackupCollection{}
el = errs.Local()
categories = map[path.CategoryType]struct{}{}
handlers = BackupHandlers(ac)
)
// Turn on concurrency limiter middleware for exchange backups
// unless explicitly disabled through DisableConcurrencyLimiterFN cli flag
graph.InitializeConcurrencyLimiter(
ctx,
bpc.Options.ToggleFeatures.DisableConcurrencyLimiter,
bpc.Options.Parallelism.ItemFetch)
cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, bpc.MetadataCollections)
if err != nil {
return nil, nil, false, err
}
ctx = clues.Add(ctx, "can_use_previous_backup", canUsePreviousBackup)
for _, scope := range eb.Scopes() {
if el.Failure() != nil {
break
}
dcs, err := createCollections(
ctx,
bpc,
handlers,
tenantID,
scope,
cdps[scope.Category().PathType()],
su,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
continue
}
categories[scope.Category().PathType()] = struct{}{}
collections = append(collections, dcs...)
}
if len(collections) > 0 {
baseCols, err := graph.BaseCollections(
ctx,
collections,
tenantID,
bpc.ProtectedResource.ID(),
path.ExchangeService,
categories,
su,
errs)
if err != nil {
return nil, nil, false, err
}
collections = append(collections, baseCols...)
}
return collections, nil, canUsePreviousBackup, el.Failure()
}
// createCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are retrieved.
func createCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
handlers map[path.CategoryType]backupHandler,
tenantID string,
scope selectors.ExchangeScope,
dps DeltaPaths,
su support.StatusUpdater,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
ctx = clues.Add(ctx, "category", scope.Category().PathType())
var (
allCollections = make([]data.BackupCollection, 0)
category = scope.Category().PathType()
qp = graph.QueryParams{
Category: category,
ProtectedResource: bpc.ProtectedResource,
TenantID: tenantID,
}
)
handler, ok := handlers[category]
if !ok {
return nil, clues.New("unsupported backup category type").WithClues(ctx)
}
foldersComplete := observe.MessageWithCompletion(
ctx,
observe.Bulletf("%s", qp.Category))
defer close(foldersComplete)
rootFolder, cc := handler.NewContainerCache(bpc.ProtectedResource.ID())
if err := cc.Populate(ctx, errs, rootFolder); err != nil {
return nil, clues.Wrap(err, "populating container cache")
}
collections, err := populateCollections(
ctx,
qp,
handler,
su,
cc,
scope,
dps,
bpc.Options,
errs)
if err != nil {
return nil, clues.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, coll := range collections {
allCollections = append(allCollections, coll)
}
return allCollections, nil
}
// populateCollections is a utility function
// that places the M365 object ids belonging to specific directories
// into a BackupCollection. Messages outside of those directories are omitted.
// @param collection is filled with during this function.
// Supports all exchange applications: Contacts, Events, and Mail
//
// TODO(ashmrtn): This should really return []data.BackupCollection but
// unfortunately some of our tests rely on being able to lookup returned
// collections by ID and it would be non-trivial to change them.
func populateCollections(
ctx context.Context,
qp graph.QueryParams,
bh backupHandler,
statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
scope selectors.ExchangeScope,
dps DeltaPaths,
ctrlOpts control.Options,
errs *fault.Bus,
) (map[string]data.BackupCollection, error) {
var (
// folder ID -> BackupCollection.
collections = map[string]data.BackupCollection{}
// folder ID -> delta url or folder path lookups
deltaURLs = map[string]string{}
currPaths = map[string]string{}
// copy of previousPaths. any folder found in the resolver get
// deleted from this map, leaving only the deleted folders behind
tombstones = makeTombstones(dps)
category = qp.Category
)
logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps))
el := errs.Local()
for _, c := range resolver.Items() {
if el.Failure() != nil {
return nil, el.Failure()
}
cID := ptr.Val(c.GetId())
delete(tombstones, cID)
var (
err error
dp = dps[cID]
prevDelta = dp.Delta
prevPathStr = dp.Path // do not log: pii; log prevPath instead
prevPath path.Path
ictx = clues.Add(
ctx,
"container_id", cID,
"previous_delta", pii.SafeURL{
URL: prevDelta,
SafePathElems: graph.SafeURLPathParams,
SafeQueryKeys: graph.SafeURLQueryParams,
})
)
currPath, locPath, ok := includeContainer(ictx, qp, c, scope, category)
// Only create a collection if the path matches the scope.
if !ok {
continue
}
if len(prevPathStr) > 0 {
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
logger.CtxErr(ictx, err).Error("parsing prev path")
// if the previous path is unusable, then the delta must be, too.
prevDelta = ""
}
}
ictx = clues.Add(ictx, "previous_path", prevPath)
added, removed, newDelta, err := bh.itemEnumerator().
GetAddedAndRemovedItemIDs(
ictx,
qp.ProtectedResource.ID(),
cID,
prevDelta,
ctrlOpts.ToggleFeatures.ExchangeImmutableIDs,
!ctrlOpts.ToggleFeatures.DisableDelta)
if err != nil {
if !graph.IsErrDeletedInFlight(err) {
el.AddRecoverable(ctx, clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
continue
}
// race conditions happen, containers might get deleted while
// this process is in flight. If that happens, force the collection
// to reset. This prevents any old items from being retained in
// storage. If the container (or its children) are sill missing
// on the next backup, they'll get tombstoned.
newDelta = api.DeltaUpdate{Reset: true}
}
if len(newDelta.URL) > 0 {
deltaURLs[cID] = newDelta.URL
} else if !newDelta.Reset {
logger.Ctx(ictx).Info("missing delta url")
}
edc := NewCollection(
qp.ProtectedResource.ID(),
currPath,
prevPath,
locPath,
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
newDelta.Reset)
collections[cID] = &edc
for _, add := range added {
edc.added[add] = struct{}{}
}
// Remove any deleted IDs from the set of added IDs because items that are
// deleted and then restored will have a different ID than they did
// originally.
for _, remove := range removed {
delete(edc.added, remove)
edc.removed[remove] = struct{}{}
}
// add the current path for the container ID to be used in the next backup
// as the "previous path", for reference in case of a rename or relocation.
currPaths[cID] = currPath.String()
}
// A tombstone is a folder that needs to be marked for deletion.
// The only situation where a tombstone should appear is if the folder exists
// in the `previousPath` set, but does not exist in the current container
// resolver (which contains all the resource owners' current containers).
for id, p := range tombstones {
if el.Failure() != nil {
return nil, el.Failure()
}
var (
err error
ictx = clues.Add(ctx, "tombstone_id", id)
)
if collections[id] != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ictx))
continue
}
// only occurs if it was a new folder that we picked up during the container
// resolver phase that got deleted in flight by the time we hit this stage.
if len(p) == 0 {
continue
}
prevPath, err := pathFromPrevString(p)
if err != nil {
// technically shouldn't ever happen. But just in case...
logger.CtxErr(ictx, err).Error("parsing tombstone prev path")
continue
}
edc := NewCollection(
qp.ProtectedResource.ID(),
nil, // marks the collection as deleted
prevPath,
nil, // tombstones don't need a location
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
false)
collections[id] = &edc
}
logger.Ctx(ctx).Infow(
"adding metadata collection entries",
"num_paths_entries", len(currPaths),
"num_deltas_entries", len(deltaURLs))
col, err := graph.MakeMetadataCollection(
qp.TenantID,
qp.ProtectedResource.ID(),
path.ExchangeService,
qp.Category,
[]graph.MetadataCollectionEntry{
graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths),
graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs),
},
statusUpdater)
if err != nil {
return nil, clues.Wrap(err, "making metadata collection")
}
collections["metadata"] = col
return collections, el.Failure()
}
// produces a set of id:path pairs from the deltapaths map.
// Each entry in the set will, if not removed, produce a collection
// that will delete the tombstone by path.
func makeTombstones(dps DeltaPaths) map[string]string {
r := make(map[string]string, len(dps))
for id, v := range dps {
r[id] = v.Path
}
return r
}
func pathFromPrevString(ps string) (path.Path, error) {
p, err := path.FromDataLayerPath(ps, false)
if err != nil {
return nil, clues.Wrap(err, "parsing previous path string")
}
return p, nil
}
// Returns true if the container passes the scope comparison and should be included.
// Returns:
// - the path representing the directory as it should be stored in the repository.
// - the human-readable path using display names.
// - true if the path passes the scope comparison.
func includeContainer(
ctx context.Context,
qp graph.QueryParams,
c graph.CachedContainer,
scope selectors.ExchangeScope,
category path.CategoryType,
) (path.Path, *path.Builder, bool) {
var (
directory string
locPath path.Path
pb = c.Path()
loc = c.Location()
)
// Clause ensures that DefaultContactFolder is inspected properly
if category == path.ContactsCategory && ptr.Val(c.GetDisplayName()) == api.DefaultContacts {
loc = loc.Append(api.DefaultContacts)
}
dirPath, err := pb.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ProtectedResource.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = dirPath.Folder(false)
if loc != nil {
locPath, err = loc.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ProtectedResource.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = locPath.Folder(false)
}
var ok bool
switch category {
case path.EmailCategory:
ok = scope.Matches(selectors.ExchangeMailFolder, directory)
case path.ContactsCategory:
ok = scope.Matches(selectors.ExchangeContactFolder, directory)
case path.EventsCategory:
ok = scope.Matches(selectors.ExchangeEventCalendar, directory)
default:
return nil, nil, false
}
logger.Ctx(ctx).With(
"included", ok,
"scope", scope,
"matches_input", directory,
).Debug("backup folder selection filter")
return dirPath, loc, ok
}
// ---------------------------------------------------------------------------
// metadata collection parsing
// ---------------------------------------------------------------------------
// MetadataFileNames produces the category-specific set of filenames used to
// store graph metadata such as delta tokens and folderID->path references.
func MetadataFileNames(cat path.CategoryType) []string {
@ -171,449 +616,3 @@ func parseMetadataCollections(
return cdp, true, nil
}
// ProduceBackupCollections returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
func ProduceBackupCollections(
ctx context.Context,
ac api.Client,
selector selectors.Selector,
tenantID string,
user idname.Provider,
metadata []data.RestoreCollection,
su support.StatusUpdater,
ctrlOpts control.Options,
errs *fault.Bus,
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) {
eb, err := selector.ToExchangeBackup()
if err != nil {
return nil, nil, false, clues.Wrap(err, "exchange dataCollection selector").WithClues(ctx)
}
var (
collections = []data.BackupCollection{}
el = errs.Local()
categories = map[path.CategoryType]struct{}{}
handlers = BackupHandlers(ac)
)
// Turn on concurrency limiter middleware for exchange backups
// unless explicitly disabled through DisableConcurrencyLimiterFN cli flag
graph.InitializeConcurrencyLimiter(
ctx,
ctrlOpts.ToggleFeatures.DisableConcurrencyLimiter,
ctrlOpts.Parallelism.ItemFetch)
cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, metadata)
if err != nil {
return nil, nil, false, err
}
ctx = clues.Add(ctx, "can_use_previous_backup", canUsePreviousBackup)
for _, scope := range eb.Scopes() {
if el.Failure() != nil {
break
}
dcs, err := createCollections(
ctx,
handlers,
tenantID,
user,
scope,
cdps[scope.Category().PathType()],
ctrlOpts,
su,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
continue
}
categories[scope.Category().PathType()] = struct{}{}
collections = append(collections, dcs...)
}
if len(collections) > 0 {
baseCols, err := graph.BaseCollections(
ctx,
collections,
tenantID,
user.ID(),
path.ExchangeService,
categories,
su,
errs)
if err != nil {
return nil, nil, false, err
}
collections = append(collections, baseCols...)
}
return collections, nil, canUsePreviousBackup, el.Failure()
}
// createCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are retrieved.
func createCollections(
ctx context.Context,
handlers map[path.CategoryType]backupHandler,
tenantID string,
user idname.Provider,
scope selectors.ExchangeScope,
dps DeltaPaths,
ctrlOpts control.Options,
su support.StatusUpdater,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
ctx = clues.Add(ctx, "category", scope.Category().PathType())
var (
allCollections = make([]data.BackupCollection, 0)
category = scope.Category().PathType()
qp = graph.QueryParams{
Category: category,
ResourceOwner: user,
TenantID: tenantID,
}
)
handler, ok := handlers[category]
if !ok {
return nil, clues.New("unsupported backup category type").WithClues(ctx)
}
foldersComplete := observe.MessageWithCompletion(
ctx,
observe.Bulletf("%s", qp.Category))
defer close(foldersComplete)
rootFolder, cc := handler.NewContainerCache(user.ID())
if err := cc.Populate(ctx, errs, rootFolder); err != nil {
return nil, clues.Wrap(err, "populating container cache")
}
collections, err := populateCollections(
ctx,
qp,
handler,
su,
cc,
scope,
dps,
ctrlOpts,
errs)
if err != nil {
return nil, clues.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, coll := range collections {
allCollections = append(allCollections, coll)
}
return allCollections, nil
}
// populateCollections is a utility function
// that places the M365 object ids belonging to specific directories
// into a BackupCollection. Messages outside of those directories are omitted.
// @param collection is filled with during this function.
// Supports all exchange applications: Contacts, Events, and Mail
//
// TODO(ashmrtn): This should really return []data.BackupCollection but
// unfortunately some of our tests rely on being able to lookup returned
// collections by ID and it would be non-trivial to change them.
func populateCollections(
ctx context.Context,
qp graph.QueryParams,
bh backupHandler,
statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
scope selectors.ExchangeScope,
dps DeltaPaths,
ctrlOpts control.Options,
errs *fault.Bus,
) (map[string]data.BackupCollection, error) {
var (
// folder ID -> BackupCollection.
collections = map[string]data.BackupCollection{}
// folder ID -> delta url or folder path lookups
deltaURLs = map[string]string{}
currPaths = map[string]string{}
// copy of previousPaths. any folder found in the resolver get
// deleted from this map, leaving only the deleted folders behind
tombstones = makeTombstones(dps)
category = qp.Category
)
logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps))
el := errs.Local()
for _, c := range resolver.Items() {
if el.Failure() != nil {
return nil, el.Failure()
}
cID := ptr.Val(c.GetId())
delete(tombstones, cID)
var (
err error
dp = dps[cID]
prevDelta = dp.Delta
prevPathStr = dp.Path // do not log: pii; log prevPath instead
prevPath path.Path
ictx = clues.Add(
ctx,
"container_id", cID,
"previous_delta", pii.SafeURL{
URL: prevDelta,
SafePathElems: graph.SafeURLPathParams,
SafeQueryKeys: graph.SafeURLQueryParams,
})
)
currPath, locPath, ok := includeContainer(ictx, qp, c, scope, category)
// Only create a collection if the path matches the scope.
if !ok {
continue
}
if len(prevPathStr) > 0 {
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
logger.CtxErr(ictx, err).Error("parsing prev path")
// if the previous path is unusable, then the delta must be, too.
prevDelta = ""
}
}
ictx = clues.Add(ictx, "previous_path", prevPath)
added, removed, newDelta, err := bh.itemEnumerator().
GetAddedAndRemovedItemIDs(
ictx,
qp.ResourceOwner.ID(),
cID,
prevDelta,
ctrlOpts.ToggleFeatures.ExchangeImmutableIDs,
!ctrlOpts.ToggleFeatures.DisableDelta)
if err != nil {
if !graph.IsErrDeletedInFlight(err) {
el.AddRecoverable(ctx, clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
continue
}
// race conditions happen, containers might get deleted while
// this process is in flight. If that happens, force the collection
// to reset. This prevents any old items from being retained in
// storage. If the container (or its children) are sill missing
// on the next backup, they'll get tombstoned.
newDelta = api.DeltaUpdate{Reset: true}
}
if len(newDelta.URL) > 0 {
deltaURLs[cID] = newDelta.URL
} else if !newDelta.Reset {
logger.Ctx(ictx).Info("missing delta url")
}
edc := NewCollection(
qp.ResourceOwner.ID(),
currPath,
prevPath,
locPath,
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
newDelta.Reset)
collections[cID] = &edc
for _, add := range added {
edc.added[add] = struct{}{}
}
// Remove any deleted IDs from the set of added IDs because items that are
// deleted and then restored will have a different ID than they did
// originally.
for _, remove := range removed {
delete(edc.added, remove)
edc.removed[remove] = struct{}{}
}
// add the current path for the container ID to be used in the next backup
// as the "previous path", for reference in case of a rename or relocation.
currPaths[cID] = currPath.String()
}
// A tombstone is a folder that needs to be marked for deletion.
// The only situation where a tombstone should appear is if the folder exists
// in the `previousPath` set, but does not exist in the current container
// resolver (which contains all the resource owners' current containers).
for id, p := range tombstones {
if el.Failure() != nil {
return nil, el.Failure()
}
var (
err error
ictx = clues.Add(ctx, "tombstone_id", id)
)
if collections[id] != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ictx))
continue
}
// only occurs if it was a new folder that we picked up during the container
// resolver phase that got deleted in flight by the time we hit this stage.
if len(p) == 0 {
continue
}
prevPath, err := pathFromPrevString(p)
if err != nil {
// technically shouldn't ever happen. But just in case...
logger.CtxErr(ictx, err).Error("parsing tombstone prev path")
continue
}
edc := NewCollection(
qp.ResourceOwner.ID(),
nil, // marks the collection as deleted
prevPath,
nil, // tombstones don't need a location
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
false)
collections[id] = &edc
}
logger.Ctx(ctx).Infow(
"adding metadata collection entries",
"num_paths_entries", len(currPaths),
"num_deltas_entries", len(deltaURLs))
col, err := graph.MakeMetadataCollection(
qp.TenantID,
qp.ResourceOwner.ID(),
path.ExchangeService,
qp.Category,
[]graph.MetadataCollectionEntry{
graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths),
graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs),
},
statusUpdater)
if err != nil {
return nil, clues.Wrap(err, "making metadata collection")
}
collections["metadata"] = col
return collections, el.Failure()
}
// produces a set of id:path pairs from the deltapaths map.
// Each entry in the set will, if not removed, produce a collection
// that will delete the tombstone by path.
func makeTombstones(dps DeltaPaths) map[string]string {
r := make(map[string]string, len(dps))
for id, v := range dps {
r[id] = v.Path
}
return r
}
func pathFromPrevString(ps string) (path.Path, error) {
p, err := path.FromDataLayerPath(ps, false)
if err != nil {
return nil, clues.Wrap(err, "parsing previous path string")
}
return p, nil
}
// Returns true if the container passes the scope comparison and should be included.
// Returns:
// - the path representing the directory as it should be stored in the repository.
// - the human-readable path using display names.
// - true if the path passes the scope comparison.
func includeContainer(
ctx context.Context,
qp graph.QueryParams,
c graph.CachedContainer,
scope selectors.ExchangeScope,
category path.CategoryType,
) (path.Path, *path.Builder, bool) {
var (
directory string
locPath path.Path
pb = c.Path()
loc = c.Location()
)
// Clause ensures that DefaultContactFolder is inspected properly
if category == path.ContactsCategory && ptr.Val(c.GetDisplayName()) == api.DefaultContacts {
loc = loc.Append(api.DefaultContacts)
}
dirPath, err := pb.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ResourceOwner.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = dirPath.Folder(false)
if loc != nil {
locPath, err = loc.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ResourceOwner.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = locPath.Folder(false)
}
var ok bool
switch category {
case path.EmailCategory:
ok = scope.Matches(selectors.ExchangeMailFolder, directory)
case path.ContactsCategory:
ok = scope.Matches(selectors.ExchangeContactFolder, directory)
case path.EventsCategory:
ok = scope.Matches(selectors.ExchangeEventCalendar, directory)
default:
return nil, nil, false
}
logger.Ctx(ctx).With(
"included", ok,
"scope", scope,
"matches_input", directory,
).Debug("backup folder selection filter")
return dirPath, loc, ok
}

View File

@ -16,8 +16,10 @@ import (
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
@ -469,14 +471,19 @@ func (suite *BackupIntgSuite) TestMailFetch() {
ctrlOpts := control.DefaultOptions()
ctrlOpts.ToggleFeatures.DisableDelta = !test.canMakeDeltaQueries
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: ctrlOpts,
ProtectedResource: inMock.NewProvider(userID, userID),
}
collections, err := createCollections(
ctx,
bpc,
handlers,
suite.tenantID,
inMock.NewProvider(userID, userID),
test.scope,
DeltaPaths{},
ctrlOpts,
func(status *support.ControllerOperationStatus) {},
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -546,15 +553,20 @@ func (suite *BackupIntgSuite) TestDelta() {
ctx, flush := tester.NewContext(t)
defer flush()
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: inMock.NewProvider(userID, userID),
}
// get collections without providing any delta history (ie: full backup)
collections, err := createCollections(
ctx,
bpc,
handlers,
suite.tenantID,
inMock.NewProvider(userID, userID),
test.scope,
DeltaPaths{},
control.DefaultOptions(),
func(status *support.ControllerOperationStatus) {},
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -582,12 +594,11 @@ func (suite *BackupIntgSuite) TestDelta() {
// which should only contain the difference.
collections, err = createCollections(
ctx,
bpc,
handlers,
suite.tenantID,
inMock.NewProvider(userID, userID),
test.scope,
dps,
control.DefaultOptions(),
func(status *support.ControllerOperationStatus) {},
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -626,14 +637,20 @@ func (suite *BackupIntgSuite) TestMailSerializationRegression() {
sel := selectors.NewExchangeBackup(users)
sel.Include(sel.MailFolders([]string{api.MailInbox}, selectors.PrefixMatch()))
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: inMock.NewProvider(suite.user, suite.user),
Selector: sel.Selector,
}
collections, err := createCollections(
ctx,
bpc,
handlers,
suite.tenantID,
inMock.NewProvider(suite.user, suite.user),
sel.Scopes()[0],
DeltaPaths{},
control.DefaultOptions(),
newStatusUpdater(t, &wg),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -702,14 +719,19 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() {
var wg sync.WaitGroup
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: inMock.NewProvider(suite.user, suite.user),
}
edcs, err := createCollections(
ctx,
bpc,
handlers,
suite.tenantID,
inMock.NewProvider(suite.user, suite.user),
test.scope,
DeltaPaths{},
control.DefaultOptions(),
newStatusUpdater(t, &wg),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -827,14 +849,19 @@ func (suite *BackupIntgSuite) TestEventsSerializationRegression() {
var wg sync.WaitGroup
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: inMock.NewProvider(suite.user, suite.user),
}
collections, err := createCollections(
ctx,
bpc,
handlers,
suite.tenantID,
inMock.NewProvider(suite.user, suite.user),
test.scope,
DeltaPaths{},
control.DefaultOptions(),
newStatusUpdater(t, &wg),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -894,7 +921,7 @@ func (suite *CollectionPopulationSuite) TestPopulateCollections() {
var (
qp = graph.QueryParams{
Category: path.EmailCategory, // doesn't matter which one we use.
ResourceOwner: inMock.NewProvider("user_id", "user_name"),
ProtectedResource: inMock.NewProvider("user_id", "user_name"),
TenantID: suite.creds.AzureTenantID,
}
statusUpdater = func(*support.ControllerOperationStatus) {}
@ -1189,7 +1216,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_D
var (
qp = graph.QueryParams{
ResourceOwner: inMock.NewProvider("user_id", "user_name"),
ProtectedResource: inMock.NewProvider("user_id", "user_name"),
TenantID: suite.creds.AzureTenantID,
}
@ -1240,7 +1267,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_D
oldPath1 := func(t *testing.T, cat path.CategoryType) path.Path {
res, err := location.Append("1").ToDataLayerPath(
suite.creds.AzureTenantID,
qp.ResourceOwner.ID(),
qp.ProtectedResource.ID(),
path.ExchangeService,
cat,
false)
@ -1252,7 +1279,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_D
oldPath2 := func(t *testing.T, cat path.CategoryType) path.Path {
res, err := location.Append("2").ToDataLayerPath(
suite.creds.AzureTenantID,
qp.ResourceOwner.ID(),
qp.ProtectedResource.ID(),
path.ExchangeService,
cat,
false)
@ -1264,7 +1291,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_D
idPath1 := func(t *testing.T, cat path.CategoryType) path.Path {
res, err := path.Builder{}.Append("1").ToDataLayerPath(
suite.creds.AzureTenantID,
qp.ResourceOwner.ID(),
qp.ProtectedResource.ID(),
path.ExchangeService,
cat,
false)
@ -1276,7 +1303,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_D
idPath2 := func(t *testing.T, cat path.CategoryType) path.Path {
res, err := path.Builder{}.Append("2").ToDataLayerPath(
suite.creds.AzureTenantID,
qp.ResourceOwner.ID(),
qp.ProtectedResource.ID(),
path.ExchangeService,
cat,
false)
@ -1575,7 +1602,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_r
var (
qp = graph.QueryParams{
Category: path.EmailCategory, // doesn't matter which one we use.
ResourceOwner: inMock.NewProvider("user_id", "user_name"),
ProtectedResource: inMock.NewProvider("user_id", "user_name"),
TenantID: suite.creds.AzureTenantID,
}
statusUpdater = func(*support.ControllerOperationStatus) {}
@ -1594,8 +1621,8 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_r
}
)
require.Equal(t, "user_id", qp.ResourceOwner.ID(), qp.ResourceOwner)
require.Equal(t, "user_name", qp.ResourceOwner.Name(), qp.ResourceOwner)
require.Equal(t, "user_id", qp.ProtectedResource.ID(), qp.ProtectedResource)
require.Equal(t, "user_name", qp.ProtectedResource.Name(), qp.ProtectedResource)
collections, err := populateCollections(
ctx,
@ -1660,7 +1687,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_i
cat = path.EmailCategory // doesn't matter which one we use,
qp = graph.QueryParams{
Category: cat,
ResourceOwner: inMock.NewProvider("user_id", "user_name"),
ProtectedResource: inMock.NewProvider("user_id", "user_name"),
TenantID: suite.creds.AzureTenantID,
}
statusUpdater = func(*support.ControllerOperationStatus) {}

View File

@ -39,7 +39,7 @@ func AllMetadataFileNames() []string {
type QueryParams struct {
Category path.CategoryType
ResourceOwner idname.Provider
ProtectedResource idname.Provider
TenantID string
}

View File

@ -35,11 +35,7 @@ type Controller struct {
func (ctrl Controller) ProduceBackupCollections(
_ context.Context,
_ idname.Provider,
_ selectors.Selector,
_ []data.RestoreCollection,
_ int,
_ control.Options,
_ inject.BackupProducerConfig,
_ *fault.Bus,
) (
[]data.BackupCollection,

View File

@ -5,33 +5,27 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
func ProduceBackupCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
ac api.Client,
selector selectors.Selector,
user idname.Provider,
metadata []data.RestoreCollection,
lastBackupVersion int,
tenant string,
su support.StatusUpdater,
ctrlOpts control.Options,
errs *fault.Bus,
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) {
odb, err := selector.ToOneDriveBackup()
odb, err := bpc.Selector.ToOneDriveBackup()
if err != nil {
return nil, nil, false, clues.Wrap(err, "parsing selector").WithClues(ctx)
}
@ -56,11 +50,11 @@ func ProduceBackupCollections(
nc := NewCollections(
&itemBackupHandler{ac.Drives(), scope},
tenant,
user.ID(),
bpc.ProtectedResource.ID(),
su,
ctrlOpts)
bpc.Options)
odcs, canUsePreviousBackup, err = nc.Get(ctx, metadata, ssmb, errs)
odcs, canUsePreviousBackup, err = nc.Get(ctx, bpc.MetadataCollections, ssmb, errs)
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
}
@ -70,12 +64,7 @@ func ProduceBackupCollections(
collections = append(collections, odcs...)
}
mcs, err := migrationCollections(
lastBackupVersion,
tenant,
user,
su,
ctrlOpts)
mcs, err := migrationCollections(bpc, tenant, su)
if err != nil {
return nil, nil, false, err
}
@ -87,7 +76,7 @@ func ProduceBackupCollections(
ctx,
collections,
tenant,
user.ID(),
bpc.ProtectedResource.ID(),
path.OneDriveService,
categories,
su,
@ -104,18 +93,16 @@ func ProduceBackupCollections(
// adds data migrations to the collection set.
func migrationCollections(
lastBackupVersion int,
bpc inject.BackupProducerConfig,
tenant string,
user idname.Provider,
su support.StatusUpdater,
ctrlOpts control.Options,
) ([]data.BackupCollection, error) {
// assume a version < 0 implies no prior backup, thus nothing to migrate.
if version.IsNoBackup(lastBackupVersion) {
if version.IsNoBackup(bpc.LastBackupVersion) {
return nil, nil
}
if lastBackupVersion >= version.All8MigrateUserPNToID {
if bpc.LastBackupVersion >= version.All8MigrateUserPNToID {
return nil, nil
}
@ -123,7 +110,7 @@ func migrationCollections(
// backup, onedrive needs to force the owner PN -> ID migration
mc, err := path.ServicePrefix(
tenant,
user.ID(),
bpc.ProtectedResource.ID(),
path.OneDriveService,
path.FilesCategory)
if err != nil {
@ -132,7 +119,7 @@ func migrationCollections(
mpc, err := path.ServicePrefix(
tenant,
user.Name(),
bpc.ProtectedResource.Name(),
path.OneDriveService,
path.FilesCategory)
if err != nil {

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
@ -85,7 +86,13 @@ func (suite *BackupUnitSuite) TestMigrationCollections() {
ToggleFeatures: control.Toggles{},
}
mc, err := migrationCollections(test.version, "t", u, nil, opts)
bpc := inject.BackupProducerConfig{
LastBackupVersion: test.version,
Options: opts,
ProtectedResource: u,
}
mc, err := migrationCollections(bpc, "t", nil)
require.NoError(t, err, clues.ToCore(err))
if test.expectLen == 0 {

View File

@ -5,7 +5,6 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/graph"
@ -13,8 +12,8 @@ import (
betaAPI "github.com/alcionai/corso/src/internal/m365/sharepoint/api"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -30,25 +29,17 @@ type statusUpdater interface {
// for the specified user
func ProduceBackupCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
ac api.Client,
selector selectors.Selector,
site idname.Provider,
metadata []data.RestoreCollection,
creds account.M365Config,
su statusUpdater,
ctrlOpts control.Options,
errs *fault.Bus,
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) {
b, err := selector.ToSharePointBackup()
b, err := bpc.Selector.ToSharePointBackup()
if err != nil {
return nil, nil, false, clues.Wrap(err, "sharePointDataCollection: parsing selector")
}
ctx = clues.Add(
ctx,
"site_id", clues.Hide(site.ID()),
"site_url", clues.Hide(site.Name()))
var (
el = errs.Local()
collections = []data.BackupCollection{}
@ -57,6 +48,11 @@ func ProduceBackupCollections(
canUsePreviousBackup bool
)
ctx = clues.Add(
ctx,
"site_id", clues.Hide(bpc.ProtectedResource.ID()),
"site_url", clues.Hide(bpc.ProtectedResource.Name()))
for _, scope := range b.Scopes() {
if el.Failure() != nil {
break
@ -73,11 +69,10 @@ func ProduceBackupCollections(
case path.ListsCategory:
spcs, err = collectLists(
ctx,
bpc,
ac,
creds.AzureTenantID,
site,
su,
ctrlOpts,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
@ -91,14 +86,12 @@ func ProduceBackupCollections(
case path.LibrariesCategory:
spcs, canUsePreviousBackup, err = collectLibraries(
ctx,
bpc,
ac.Drives(),
creds.AzureTenantID,
site,
metadata,
ssmb,
scope,
su,
ctrlOpts,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
@ -108,11 +101,10 @@ func ProduceBackupCollections(
case path.PagesCategory:
spcs, err = collectPages(
ctx,
bpc,
creds,
ac,
site,
su,
ctrlOpts,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
@ -135,7 +127,7 @@ func ProduceBackupCollections(
ctx,
collections,
creds.AzureTenantID,
site.ID(),
bpc.ProtectedResource.ID(),
path.SharePointService,
categories,
su.UpdateStatus,
@ -152,11 +144,10 @@ func ProduceBackupCollections(
func collectLists(
ctx context.Context,
bpc inject.BackupProducerConfig,
ac api.Client,
tenantID string,
site idname.Provider,
updater statusUpdater,
ctrlOpts control.Options,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
logger.Ctx(ctx).Debug("Creating SharePoint List Collections")
@ -166,7 +157,7 @@ func collectLists(
spcs = make([]data.BackupCollection, 0)
)
lists, err := preFetchLists(ctx, ac.Stable, site.ID())
lists, err := preFetchLists(ctx, ac.Stable, bpc.ProtectedResource.ID())
if err != nil {
return nil, err
}
@ -178,7 +169,7 @@ func collectLists(
dir, err := path.Build(
tenantID,
site.ID(),
bpc.ProtectedResource.ID(),
path.SharePointService,
path.ListsCategory,
false,
@ -192,7 +183,7 @@ func collectLists(
ac,
List,
updater.UpdateStatus,
ctrlOpts)
bpc.Options)
collection.AddJob(tuple.id)
spcs = append(spcs, collection)
@ -205,14 +196,12 @@ func collectLists(
// all the drives associated with the site.
func collectLibraries(
ctx context.Context,
bpc inject.BackupProducerConfig,
ad api.Drives,
tenantID string,
site idname.Provider,
metadata []data.RestoreCollection,
ssmb *prefixmatcher.StringSetMatchBuilder,
scope selectors.SharePointScope,
updater statusUpdater,
ctrlOpts control.Options,
errs *fault.Bus,
) ([]data.BackupCollection, bool, error) {
logger.Ctx(ctx).Debug("creating SharePoint Library collections")
@ -222,12 +211,12 @@ func collectLibraries(
colls = onedrive.NewCollections(
&libraryBackupHandler{ad, scope},
tenantID,
site.ID(),
bpc.ProtectedResource.ID(),
updater.UpdateStatus,
ctrlOpts)
bpc.Options)
)
odcs, canUsePreviousBackup, err := colls.Get(ctx, metadata, ssmb, errs)
odcs, canUsePreviousBackup, err := colls.Get(ctx, bpc.MetadataCollections, ssmb, errs)
if err != nil {
return nil, false, graph.Wrap(ctx, err, "getting library")
}
@ -239,11 +228,10 @@ func collectLibraries(
// M365 IDs for the associated Pages.
func collectPages(
ctx context.Context,
bpc inject.BackupProducerConfig,
creds account.M365Config,
ac api.Client,
site idname.Provider,
updater statusUpdater,
ctrlOpts control.Options,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
logger.Ctx(ctx).Debug("creating SharePoint Pages collections")
@ -265,7 +253,7 @@ func collectPages(
betaService := betaAPI.NewBetaService(adpt)
tuples, err := betaAPI.FetchPages(ctx, betaService, site.ID())
tuples, err := betaAPI.FetchPages(ctx, betaService, bpc.ProtectedResource.ID())
if err != nil {
return nil, err
}
@ -277,7 +265,7 @@ func collectPages(
dir, err := path.Build(
creds.AzureTenantID,
site.ID(),
bpc.ProtectedResource.ID(),
path.SharePointService,
path.PagesCategory,
false,
@ -291,7 +279,7 @@ func collectPages(
ac,
Pages,
updater.UpdateStatus,
ctrlOpts)
bpc.Options)
collection.betaService = betaService
collection.AddJob(tuple.ID)

View File

@ -13,8 +13,10 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/onedrive"
odConsts "github.com/alcionai/corso/src/internal/m365/onedrive/consts"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
@ -204,13 +206,18 @@ func (suite *SharePointPagesSuite) TestCollectPages() {
ac, err := api.NewClient(creds, control.DefaultOptions())
require.NoError(t, err, clues.ToCore(err))
bpc := inject.BackupProducerConfig{
LastBackupVersion: version.NoBackup,
Options: control.DefaultOptions(),
ProtectedResource: mock.NewProvider(siteID, siteID),
}
col, err := collectPages(
ctx,
bpc,
creds,
ac,
mock.NewProvider(siteID, siteID),
&MockGraphService{},
control.DefaultOptions(),
fault.New(true))
assert.NoError(t, err, clues.ToCore(err))
assert.NotEmpty(t, col)

View File

@ -392,7 +392,7 @@ func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool {
func produceBackupDataCollections(
ctx context.Context,
bp inject.BackupProducer,
resourceOwner idname.Provider,
protectedResource idname.Provider,
sel selectors.Selector,
metadata []data.RestoreCollection,
lastBackupVersion int,
@ -405,14 +405,15 @@ func produceBackupDataCollections(
close(complete)
}()
return bp.ProduceBackupCollections(
ctx,
resourceOwner,
sel,
metadata,
lastBackupVersion,
ctrlOpts,
errs)
bpc := inject.BackupProducerConfig{
LastBackupVersion: lastBackupVersion,
MetadataCollections: metadata,
Options: ctrlOpts,
ProtectedResource: protectedResource,
Selector: sel,
}
return bp.ProduceBackupCollections(ctx, bpc, errs)
}
// ---------------------------------------------------------------------------

View File

@ -2,12 +2,13 @@ package inject
import (
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/selectors"
)
// RestoreConsumerConfig is a container-of-things for holding options and
// configurations from various packages, which are widely used by all
// configurations from various packages, all of which are widely used by
// restore consumers independent of service or data category.
type RestoreConsumerConfig struct {
BackupVersion int
@ -16,3 +17,14 @@ type RestoreConsumerConfig struct {
RestoreConfig control.RestoreConfig
Selector selectors.Selector
}
// BackupProducerConfig is a container-of-things for holding options and
// configurations from various packages, all of which are widely used by
// backup producers independent of service or data category.
type BackupProducerConfig struct {
LastBackupVersion int
MetadataCollections []data.RestoreCollection
Options control.Options
ProtectedResource idname.Provider
Selector selectors.Selector
}

View File

@ -22,11 +22,7 @@ type (
BackupProducer interface {
ProduceBackupCollections(
ctx context.Context,
resourceOwner idname.Provider,
sels selectors.Selector,
metadata []data.RestoreCollection,
lastBackupVersion int,
ctrlOpts control.Options,
bpc BackupProducerConfig,
errs *fault.Bus,
) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error)
IsBackupRunnable(ctx context.Context, service path.ServiceType, resourceOwner string) (bool, error)