add boilerplate for drive tree processing (#4716)
begins implementation of the drive delta tree support by adding boilerplate funcs for getting the backup data, and adding a framework of unit tests that will be used bring testing parity up to par with current tests. --- #### Does this PR need a docs update or release note? - [x] ⛔ No #### Issue(s) * #4689 #### Test Plan - [x] ⚡ Unit test
This commit is contained in:
parent
793658c790
commit
6aff258c8b
@ -25,6 +25,23 @@ const (
|
||||
DeletedState CollectionState = 3
|
||||
)
|
||||
|
||||
func (cs CollectionState) String() string {
|
||||
s := "Unknown State"
|
||||
|
||||
switch cs {
|
||||
case 0:
|
||||
s = "New"
|
||||
case 1:
|
||||
s = "Not Moved"
|
||||
case 2:
|
||||
s = "Moved"
|
||||
case 3:
|
||||
s = "Deleted"
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
type FetchRestoreCollection struct {
|
||||
Collection
|
||||
FetchItemByNamer
|
||||
|
||||
@ -291,6 +291,17 @@ func (c *Collections) Get(
|
||||
ssmb *prefixmatcher.StringSetMatchBuilder,
|
||||
errs *fault.Bus,
|
||||
) ([]data.BackupCollection, bool, error) {
|
||||
if c.ctrl.ToggleFeatures.UseDeltaTree {
|
||||
_, _, err := c.getTree(ctx, prevMetadata, ssmb, errs)
|
||||
if err != nil {
|
||||
return nil, false, clues.Wrap(err, "processing backup using tree")
|
||||
}
|
||||
|
||||
return nil,
|
||||
false,
|
||||
clues.New("forced error: cannot run tree-based backup: incomplete implementation")
|
||||
}
|
||||
|
||||
deltasByDriveID, prevPathsByDriveID, canUsePrevBackup, err := deserializeAndValidateMetadata(
|
||||
ctx,
|
||||
prevMetadata,
|
||||
@ -750,87 +761,6 @@ func (c *Collections) getCollectionPath(
|
||||
return collectionPath, nil
|
||||
}
|
||||
|
||||
type driveEnumerationStats struct {
|
||||
numPages int
|
||||
numAddedFiles int
|
||||
numContainers int
|
||||
numBytes int64
|
||||
}
|
||||
|
||||
func newPagerLimiter(opts control.Options) *pagerLimiter {
|
||||
res := &pagerLimiter{limits: opts.PreviewLimits}
|
||||
|
||||
if res.limits.MaxContainers == 0 {
|
||||
res.limits.MaxContainers = defaultPreviewMaxContainers
|
||||
}
|
||||
|
||||
if res.limits.MaxItemsPerContainer == 0 {
|
||||
res.limits.MaxItemsPerContainer = defaultPreviewMaxItemsPerContainer
|
||||
}
|
||||
|
||||
if res.limits.MaxItems == 0 {
|
||||
res.limits.MaxItems = defaultPreviewMaxItems
|
||||
}
|
||||
|
||||
if res.limits.MaxBytes == 0 {
|
||||
res.limits.MaxBytes = defaultPreviewMaxBytes
|
||||
}
|
||||
|
||||
if res.limits.MaxPages == 0 {
|
||||
res.limits.MaxPages = defaultPreviewMaxPages
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
type pagerLimiter struct {
|
||||
limits control.PreviewItemLimits
|
||||
}
|
||||
|
||||
func (l pagerLimiter) effectiveLimits() control.PreviewItemLimits {
|
||||
return l.limits
|
||||
}
|
||||
|
||||
func (l pagerLimiter) enabled() bool {
|
||||
return l.limits.Enabled
|
||||
}
|
||||
|
||||
// sizeLimit returns the total number of bytes this backup should try to
|
||||
// contain.
|
||||
func (l pagerLimiter) sizeLimit() int64 {
|
||||
return l.limits.MaxBytes
|
||||
}
|
||||
|
||||
// atItemLimit returns true if the limiter is enabled and has reached the limit
|
||||
// for individual items added to collections for this backup.
|
||||
func (l pagerLimiter) atItemLimit(stats *driveEnumerationStats) bool {
|
||||
return l.enabled() &&
|
||||
(stats.numAddedFiles >= l.limits.MaxItems ||
|
||||
stats.numBytes >= l.limits.MaxBytes)
|
||||
}
|
||||
|
||||
// atContainerItemsLimit returns true if the limiter is enabled and the current
|
||||
// number of items is above the limit for the number of items for a container
|
||||
// for this backup.
|
||||
func (l pagerLimiter) atContainerItemsLimit(numItems int) bool {
|
||||
return l.enabled() && numItems >= l.limits.MaxItemsPerContainer
|
||||
}
|
||||
|
||||
// atContainerPageLimit returns true if the limiter is enabled and the number of
|
||||
// pages processed so far is beyond the limit for this backup.
|
||||
func (l pagerLimiter) atPageLimit(stats *driveEnumerationStats) bool {
|
||||
return l.enabled() && stats.numPages >= l.limits.MaxPages
|
||||
}
|
||||
|
||||
// atLimit returns true if the limiter is enabled and meets any of the
|
||||
// conditions for max items, containers, etc for this backup.
|
||||
func (l pagerLimiter) atLimit(stats *driveEnumerationStats) bool {
|
||||
return l.enabled() &&
|
||||
(l.atItemLimit(stats) ||
|
||||
stats.numContainers >= l.limits.MaxContainers ||
|
||||
stats.numPages >= l.limits.MaxPages)
|
||||
}
|
||||
|
||||
// PopulateDriveCollections initializes and adds the provided drive items to Collections
|
||||
// A new collection is created for every drive folder.
|
||||
// Along with populating the collection items and updating the excluded item IDs, this func
|
||||
@ -926,7 +856,7 @@ func (c *Collections) PopulateDriveCollections(
|
||||
// Don't check for containers we've already seen.
|
||||
if _, ok := c.CollectionMap[driveID][id]; !ok {
|
||||
if id != lastContainerID {
|
||||
if limiter.atLimit(stats) {
|
||||
if limiter.atLimit(stats, ignoreMe) {
|
||||
break
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
326
src/internal/m365/collection/drive/collections_tree.go
Normal file
326
src/internal/m365/collection/drive/collections_tree.go
Normal file
@ -0,0 +1,326 @@
|
||||
package drive
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/pagers"
|
||||
)
|
||||
|
||||
// this file is used to separate the collections handling between the previous
|
||||
// (list-based) design, and the in-progress (tree-based) redesign.
|
||||
// see: https://github.com/alcionai/corso/issues/4688
|
||||
|
||||
func (c *Collections) getTree(
|
||||
ctx context.Context,
|
||||
prevMetadata []data.RestoreCollection,
|
||||
ssmb *prefixmatcher.StringSetMatchBuilder,
|
||||
errs *fault.Bus,
|
||||
) ([]data.BackupCollection, bool, error) {
|
||||
ctx = clues.AddTraceName(ctx, "GetTree")
|
||||
|
||||
// extract the previous backup's metadata like: deltaToken urls and previousPath maps.
|
||||
// We'll need these to reconstruct / ensure the correct state of the world, after
|
||||
// enumerating through all the delta changes.
|
||||
deltasByDriveID, prevPathsByDriveID, canUsePrevBackup, err := deserializeAndValidateMetadata(
|
||||
ctx,
|
||||
prevMetadata,
|
||||
c.counter,
|
||||
errs)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
ctx = clues.Add(ctx, "can_use_previous_backup", canUsePrevBackup)
|
||||
|
||||
// in sharepoint, it's possible to delete an entire drive.
|
||||
// if we don't see a previously-existing drive in the drives enumeration,
|
||||
// we assume it was deleted and will remove it from storage using a tombstone.
|
||||
driveTombstones := map[string]struct{}{}
|
||||
for driveID := range prevPathsByDriveID {
|
||||
driveTombstones[driveID] = struct{}{}
|
||||
}
|
||||
|
||||
pager := c.handler.NewDrivePager(c.protectedResource.ID(), nil)
|
||||
|
||||
drives, err := api.GetAllDrives(ctx, pager)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
c.counter.Add(count.Drives, int64(len(drives)))
|
||||
c.counter.Add(count.PrevDeltas, int64(len(deltasByDriveID)))
|
||||
|
||||
var (
|
||||
el = errs.Local()
|
||||
collections = []data.BackupCollection{}
|
||||
driveIDToNewDeltaLink = map[string]string{}
|
||||
driveIDToNewPrevPaths = map[string]map[string]string{}
|
||||
)
|
||||
|
||||
// each drive owns its own delta history. We can't go more granular than that.
|
||||
// so our first order of business is to enumerate each drive's delta data, and
|
||||
// to use that as the basis for our backups.
|
||||
for _, drv := range drives {
|
||||
if el.Failure() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
var (
|
||||
driveID = ptr.Val(drv.GetId())
|
||||
cl = c.counter.Local()
|
||||
ictx = clues.Add(
|
||||
ctx,
|
||||
"drive_id", driveID,
|
||||
"drive_name", clues.Hide(ptr.Val(drv.GetName())))
|
||||
)
|
||||
|
||||
ictx = clues.AddLabelCounter(ictx, cl.PlainAdder())
|
||||
|
||||
// all the magic happens here. expecations are that this process will:
|
||||
// - iterate over all data (new or delta, as needed) in the drive
|
||||
// - condense that data into a set of collections to backup
|
||||
// - stitch the new and previous path data into a new prevPaths map
|
||||
// - report the latest delta token details
|
||||
colls, newPrevPaths, du, err := c.makeDriveCollections(
|
||||
ictx,
|
||||
drv,
|
||||
prevPathsByDriveID[driveID],
|
||||
cl,
|
||||
el.Local())
|
||||
if err != nil {
|
||||
el.AddRecoverable(ictx, clues.Stack(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// add all the freshly aggregated data into our results
|
||||
collections = append(collections, colls...)
|
||||
driveIDToNewPrevPaths[driveID] = newPrevPaths
|
||||
driveIDToNewDeltaLink[driveID] = du.URL
|
||||
|
||||
// this drive is still in use, so we'd better not delete it.
|
||||
delete(driveTombstones, driveID)
|
||||
}
|
||||
|
||||
if el.Failure() != nil {
|
||||
return nil, false, clues.Stack(el.Failure())
|
||||
}
|
||||
|
||||
alertIfPrevPathsHaveCollisions(ctx, driveIDToNewPrevPaths, c.counter, errs)
|
||||
|
||||
// clean up any drives that have been deleted since the last backup.
|
||||
dts, err := c.makeDriveTombstones(ctx, driveTombstones, errs)
|
||||
if err != nil {
|
||||
return nil, false, clues.Stack(err)
|
||||
}
|
||||
|
||||
collections = append(collections, dts...)
|
||||
|
||||
// persist our updated metadata for use on the next backup
|
||||
colls := c.makeMetadataCollections(
|
||||
ctx,
|
||||
driveIDToNewDeltaLink,
|
||||
driveIDToNewPrevPaths)
|
||||
|
||||
collections = append(collections, colls...)
|
||||
|
||||
logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections))
|
||||
|
||||
return collections, canUsePrevBackup, nil
|
||||
}
|
||||
|
||||
func (c *Collections) makeDriveCollections(
|
||||
ctx context.Context,
|
||||
d models.Driveable,
|
||||
prevPaths map[string]string,
|
||||
counter *count.Bus,
|
||||
errs *fault.Bus,
|
||||
) ([]data.BackupCollection, map[string]string, pagers.DeltaUpdate, error) {
|
||||
cl := c.counter.Local()
|
||||
|
||||
cl.Add(count.PrevPaths, int64(len(prevPaths)))
|
||||
logger.Ctx(ctx).Infow(
|
||||
"previous metadata for drive",
|
||||
"count_old_prev_paths", len(prevPaths))
|
||||
|
||||
// TODO(keepers): leaving this code around for now as a guide
|
||||
// while implementation progresses.
|
||||
|
||||
// --- pager aggregation
|
||||
|
||||
// du, newPrevPaths, err := c.PopulateDriveCollections(
|
||||
// ctx,
|
||||
// d,
|
||||
// tree,
|
||||
// cl.Local(),
|
||||
// errs)
|
||||
// if err != nil {
|
||||
// return nil, false, clues.Stack(err)
|
||||
// }
|
||||
|
||||
// numDriveItems := c.NumItems - numPrevItems
|
||||
// numPrevItems = c.NumItems
|
||||
|
||||
// cl.Add(count.NewPrevPaths, int64(len(newPrevPaths)))
|
||||
|
||||
// --- prev path incorporation
|
||||
|
||||
// For both cases we don't need to do set difference on folder map if the
|
||||
// delta token was valid because we should see all the changes.
|
||||
// if !du.Reset {
|
||||
// if len(excludedItemIDs) == 0 {
|
||||
// continue
|
||||
// }
|
||||
|
||||
// p, err := c.handler.CanonicalPath(odConsts.DriveFolderPrefixBuilder(driveID), c.tenantID)
|
||||
// if err != nil {
|
||||
// return nil, false, clues.WrapWC(ictx, err, "making exclude prefix")
|
||||
// }
|
||||
|
||||
// ssmb.Add(p.String(), excludedItemIDs)
|
||||
|
||||
// continue
|
||||
// }
|
||||
|
||||
// Set all folders in previous backup but not in the current one with state
|
||||
// deleted. Need to compare by ID because it's possible to make new folders
|
||||
// with the same path as deleted old folders. We shouldn't merge items or
|
||||
// subtrees if that happens though.
|
||||
|
||||
// --- post-processing
|
||||
|
||||
// Attach an url cache to the drive if the number of discovered items is
|
||||
// below the threshold. Attaching cache to larger drives can cause
|
||||
// performance issues since cache delta queries start taking up majority of
|
||||
// the hour the refreshed URLs are valid for.
|
||||
|
||||
// if numDriveItems < urlCacheDriveItemThreshold {
|
||||
// logger.Ctx(ictx).Infow(
|
||||
// "adding url cache for drive",
|
||||
// "num_drive_items", numDriveItems)
|
||||
|
||||
// uc, err := newURLCache(
|
||||
// driveID,
|
||||
// prevDeltaLink,
|
||||
// urlCacheRefreshInterval,
|
||||
// c.handler,
|
||||
// cl,
|
||||
// errs)
|
||||
// if err != nil {
|
||||
// return nil, false, clues.Stack(err)
|
||||
// }
|
||||
|
||||
// // Set the URL cache instance for all collections in this drive.
|
||||
// for id := range c.CollectionMap[driveID] {
|
||||
// c.CollectionMap[driveID][id].urlCache = uc
|
||||
// }
|
||||
// }
|
||||
|
||||
return nil, nil, pagers.DeltaUpdate{}, clues.New("not yet implemented")
|
||||
}
|
||||
|
||||
// quality-of-life wrapper that transforms each tombstone in the map
|
||||
// into a backup collection that marks the backup as deleted.
|
||||
func (c *Collections) makeDriveTombstones(
|
||||
ctx context.Context,
|
||||
driveTombstones map[string]struct{},
|
||||
errs *fault.Bus,
|
||||
) ([]data.BackupCollection, error) {
|
||||
c.counter.Add(count.DriveTombstones, int64(len(driveTombstones)))
|
||||
|
||||
var (
|
||||
colls = make([]data.BackupCollection, 0, len(driveTombstones))
|
||||
el = errs.Local()
|
||||
)
|
||||
|
||||
// generate tombstones for drives that were removed.
|
||||
for driveID := range driveTombstones {
|
||||
if el.Failure() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
prevDrivePath, err := c.handler.PathPrefix(c.tenantID, driveID)
|
||||
if err != nil {
|
||||
err = clues.WrapWC(ctx, err, "making drive tombstone for previous path").Label(count.BadPathPrefix)
|
||||
el.AddRecoverable(ctx, err)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: call NewTombstoneCollection
|
||||
coll, err := NewCollection(
|
||||
c.handler,
|
||||
c.protectedResource,
|
||||
nil, // delete the drive
|
||||
prevDrivePath,
|
||||
driveID,
|
||||
c.statusUpdater,
|
||||
c.ctrl,
|
||||
false,
|
||||
true,
|
||||
nil,
|
||||
c.counter.Local())
|
||||
if err != nil {
|
||||
err = clues.WrapWC(ctx, err, "making drive tombstone")
|
||||
el.AddRecoverable(ctx, err)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
colls = append(colls, coll)
|
||||
}
|
||||
|
||||
return colls, el.Failure()
|
||||
}
|
||||
|
||||
// quality-of-life wrapper that transforms the delta tokens and previous paths
|
||||
// into a backup collections for persitence.
|
||||
func (c *Collections) makeMetadataCollections(
|
||||
ctx context.Context,
|
||||
deltaTokens map[string]string,
|
||||
prevPaths map[string]map[string]string,
|
||||
) []data.BackupCollection {
|
||||
colls := []data.BackupCollection{}
|
||||
|
||||
pathPrefix, err := c.handler.MetadataPathPrefix(c.tenantID)
|
||||
if err != nil {
|
||||
logger.CtxErr(ctx, err).Info("making metadata collection path prefixes")
|
||||
|
||||
// It's safe to return here because the logic for starting an
|
||||
// incremental backup should eventually find that the metadata files are
|
||||
// empty/missing and default to a full backup.
|
||||
return colls
|
||||
}
|
||||
|
||||
entries := []graph.MetadataCollectionEntry{
|
||||
graph.NewMetadataEntry(bupMD.DeltaURLsFileName, deltaTokens),
|
||||
graph.NewMetadataEntry(bupMD.PreviousPathFileName, prevPaths),
|
||||
}
|
||||
|
||||
md, err := graph.MakeMetadataCollection(
|
||||
pathPrefix,
|
||||
entries,
|
||||
c.statusUpdater,
|
||||
c.counter.Local())
|
||||
if err != nil {
|
||||
logger.CtxErr(ctx, err).Info("making metadata collection for future incremental backups")
|
||||
|
||||
// Technically it's safe to continue here because the logic for starting an
|
||||
// incremental backup should eventually find that the metadata files are
|
||||
// empty/missing and default to a full backup.
|
||||
return colls
|
||||
}
|
||||
|
||||
return append(colls, md)
|
||||
}
|
||||
510
src/internal/m365/collection/drive/collections_tree_test.go
Normal file
510
src/internal/m365/collection/drive/collections_tree_test.go
Normal file
@ -0,0 +1,510 @@
|
||||
package drive
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/idname"
|
||||
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
dataMock "github.com/alcionai/corso/src/internal/data/mock"
|
||||
"github.com/alcionai/corso/src/internal/m365/service/onedrive/mock"
|
||||
"github.com/alcionai/corso/src/internal/m365/support"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
countTD "github.com/alcionai/corso/src/pkg/count/testdata"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
|
||||
apiMock "github.com/alcionai/corso/src/pkg/services/m365/api/mock"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func collWithMBH(mbh BackupHandler) *Collections {
|
||||
return NewCollections(
|
||||
mbh,
|
||||
tenant,
|
||||
idname.NewProvider(user, user),
|
||||
func(*support.ControllerOperationStatus) {},
|
||||
control.Options{ToggleFeatures: control.Toggles{
|
||||
UseDeltaTree: true,
|
||||
}},
|
||||
count.New())
|
||||
}
|
||||
|
||||
func fullOrPrevPath(
|
||||
t *testing.T,
|
||||
coll data.BackupCollection,
|
||||
) path.Path {
|
||||
var collPath path.Path
|
||||
|
||||
if coll.State() != data.DeletedState {
|
||||
collPath = coll.FullPath()
|
||||
} else {
|
||||
collPath = coll.PreviousPath()
|
||||
}
|
||||
|
||||
require.False(
|
||||
t,
|
||||
len(collPath.Elements()) < 4,
|
||||
"malformed or missing collection path")
|
||||
|
||||
return collPath
|
||||
}
|
||||
|
||||
func pagerForDrives(drives ...models.Driveable) *apiMock.Pager[models.Driveable] {
|
||||
return &apiMock.Pager[models.Driveable]{
|
||||
ToReturn: []apiMock.PagerResult[models.Driveable]{
|
||||
{Values: drives},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func makePrevMetadataColls(
|
||||
t *testing.T,
|
||||
mbh BackupHandler,
|
||||
previousPaths map[string]map[string]string,
|
||||
) []data.RestoreCollection {
|
||||
pathPrefix, err := mbh.MetadataPathPrefix(tenant)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
prevDeltas := map[string]string{}
|
||||
|
||||
for driveID := range previousPaths {
|
||||
prevDeltas[driveID] = idx(delta, "prev")
|
||||
}
|
||||
|
||||
mdColl, err := graph.MakeMetadataCollection(
|
||||
pathPrefix,
|
||||
[]graph.MetadataCollectionEntry{
|
||||
graph.NewMetadataEntry(bupMD.DeltaURLsFileName, prevDeltas),
|
||||
graph.NewMetadataEntry(bupMD.PreviousPathFileName, previousPaths),
|
||||
},
|
||||
func(*support.ControllerOperationStatus) {},
|
||||
count.New())
|
||||
require.NoError(t, err, "creating metadata collection", clues.ToCore(err))
|
||||
|
||||
return []data.RestoreCollection{
|
||||
dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: mdColl}),
|
||||
}
|
||||
}
|
||||
|
||||
func compareMetadata(
|
||||
t *testing.T,
|
||||
mdColl data.Collection,
|
||||
expectDeltas map[string]string,
|
||||
expectPrevPaths map[string]map[string]string,
|
||||
) {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
colls := []data.RestoreCollection{
|
||||
dataMock.NewUnversionedRestoreCollection(t, data.NoFetchRestoreCollection{Collection: mdColl}),
|
||||
}
|
||||
|
||||
deltas, prevs, _, err := deserializeAndValidateMetadata(
|
||||
ctx,
|
||||
colls,
|
||||
count.New(),
|
||||
fault.New(true))
|
||||
require.NoError(t, err, "deserializing metadata", clues.ToCore(err))
|
||||
assert.Equal(t, expectDeltas, deltas, "delta urls")
|
||||
assert.Equal(t, expectPrevPaths, prevs, "previous paths")
|
||||
}
|
||||
|
||||
// for comparisons done by collection state
|
||||
type stateAssertion struct {
|
||||
itemIDs []string
|
||||
// should never get set by the user.
|
||||
// this flag gets flipped when calling assertions.compare.
|
||||
// any unseen collection will error on requireNoUnseenCollections
|
||||
sawCollection bool
|
||||
}
|
||||
|
||||
// for comparisons done by a given collection path
|
||||
type collectionAssertion struct {
|
||||
doNotMerge assert.BoolAssertionFunc
|
||||
states map[data.CollectionState]*stateAssertion
|
||||
excludedItems map[string]struct{}
|
||||
}
|
||||
|
||||
type statesToItemIDs map[data.CollectionState][]string
|
||||
|
||||
// TODO(keepers): move excludeItems to a more global position.
|
||||
func newCollAssertion(
|
||||
doNotMerge bool,
|
||||
itemsByState statesToItemIDs,
|
||||
excludeItems ...string,
|
||||
) collectionAssertion {
|
||||
states := map[data.CollectionState]*stateAssertion{}
|
||||
|
||||
for state, itemIDs := range itemsByState {
|
||||
states[state] = &stateAssertion{
|
||||
itemIDs: itemIDs,
|
||||
}
|
||||
}
|
||||
|
||||
dnm := assert.False
|
||||
if doNotMerge {
|
||||
dnm = assert.True
|
||||
}
|
||||
|
||||
return collectionAssertion{
|
||||
doNotMerge: dnm,
|
||||
states: states,
|
||||
excludedItems: makeExcludeMap(excludeItems...),
|
||||
}
|
||||
}
|
||||
|
||||
// to aggregate all collection-related expectations in the backup
|
||||
// map collection path -> collection state -> assertion
|
||||
type collectionAssertions map[string]collectionAssertion
|
||||
|
||||
// ensure the provided collection matches expectations as set by the test.
|
||||
func (cas collectionAssertions) compare(
|
||||
t *testing.T,
|
||||
coll data.BackupCollection,
|
||||
excludes *prefixmatcher.StringSetMatchBuilder,
|
||||
) {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
var (
|
||||
itemCh = coll.Items(ctx, fault.New(true))
|
||||
itemIDs = []string{}
|
||||
)
|
||||
|
||||
p := fullOrPrevPath(t, coll)
|
||||
|
||||
for itm := range itemCh {
|
||||
itemIDs = append(itemIDs, itm.ID())
|
||||
}
|
||||
|
||||
expect := cas[p.String()]
|
||||
expectState := expect.states[coll.State()]
|
||||
expectState.sawCollection = true
|
||||
|
||||
assert.ElementsMatchf(
|
||||
t,
|
||||
expectState.itemIDs,
|
||||
itemIDs,
|
||||
"expected all items to match in collection with:\nstate %q\npath %q",
|
||||
coll.State(),
|
||||
p)
|
||||
|
||||
expect.doNotMerge(
|
||||
t,
|
||||
coll.DoNotMergeItems(),
|
||||
"expected collection to have the appropariate doNotMerge flag")
|
||||
|
||||
if result, ok := excludes.Get(p.String()); ok {
|
||||
assert.Equal(
|
||||
t,
|
||||
expect.excludedItems,
|
||||
result,
|
||||
"excluded items")
|
||||
}
|
||||
}
|
||||
|
||||
// ensure that no collections in the expected set are still flagged
|
||||
// as sawCollection == false.
|
||||
func (cas collectionAssertions) requireNoUnseenCollections(
|
||||
t *testing.T,
|
||||
) {
|
||||
for p, withPath := range cas {
|
||||
for _, state := range withPath.states {
|
||||
require.True(
|
||||
t,
|
||||
state.sawCollection,
|
||||
"results should have contained collection:\n\t%q\t\n%q",
|
||||
state, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type CollectionsTreeUnitSuite struct {
|
||||
tester.Suite
|
||||
}
|
||||
|
||||
func TestCollectionsTreeUnitSuite(t *testing.T) {
|
||||
suite.Run(t, &CollectionsTreeUnitSuite{Suite: tester.NewUnitSuite(t)})
|
||||
}
|
||||
|
||||
func (suite *CollectionsTreeUnitSuite) TestCollections_MakeDriveTombstones() {
|
||||
badPfxMBH := mock.DefaultOneDriveBH(user)
|
||||
badPfxMBH.PathPrefixErr = assert.AnError
|
||||
|
||||
twostones := map[string]struct{}{
|
||||
"t1": {},
|
||||
"t2": {},
|
||||
}
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
tombstones map[string]struct{}
|
||||
c *Collections
|
||||
expectErr assert.ErrorAssertionFunc
|
||||
expect assert.ValueAssertionFunc
|
||||
}{
|
||||
{
|
||||
name: "nil",
|
||||
tombstones: nil,
|
||||
c: collWithMBH(mock.DefaultOneDriveBH(user)),
|
||||
expectErr: assert.NoError,
|
||||
expect: assert.Empty,
|
||||
},
|
||||
{
|
||||
name: "none",
|
||||
tombstones: map[string]struct{}{},
|
||||
c: collWithMBH(mock.DefaultOneDriveBH(user)),
|
||||
expectErr: assert.NoError,
|
||||
expect: assert.Empty,
|
||||
},
|
||||
{
|
||||
name: "some tombstones",
|
||||
tombstones: twostones,
|
||||
c: collWithMBH(mock.DefaultOneDriveBH(user)),
|
||||
expectErr: assert.NoError,
|
||||
expect: assert.NotEmpty,
|
||||
},
|
||||
{
|
||||
name: "bad prefix path",
|
||||
tombstones: twostones,
|
||||
c: collWithMBH(badPfxMBH),
|
||||
expectErr: assert.Error,
|
||||
expect: assert.Empty,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
colls, err := test.c.makeDriveTombstones(ctx, test.tombstones, fault.New(true))
|
||||
test.expectErr(t, err, clues.ToCore(err))
|
||||
test.expect(t, colls)
|
||||
|
||||
for _, coll := range colls {
|
||||
assert.Equal(t, data.DeletedState, coll.State(), "tombstones should always delete data")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *CollectionsTreeUnitSuite) TestCollections_MakeMetadataCollections() {
|
||||
badMetaPfxMBH := mock.DefaultOneDriveBH(user)
|
||||
badMetaPfxMBH.MetadataPathPrefixErr = assert.AnError
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
c *Collections
|
||||
expect assert.ValueAssertionFunc
|
||||
}{
|
||||
{
|
||||
name: "no errors",
|
||||
c: collWithMBH(mock.DefaultOneDriveBH(user)),
|
||||
expect: assert.NotEmpty,
|
||||
},
|
||||
{
|
||||
name: "bad prefix path",
|
||||
c: collWithMBH(badMetaPfxMBH),
|
||||
expect: assert.Empty,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
var (
|
||||
t = suite.T()
|
||||
deltaTokens = map[string]string{}
|
||||
prevPaths = map[string]map[string]string{}
|
||||
)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
colls := test.c.makeMetadataCollections(ctx, deltaTokens, prevPaths)
|
||||
test.expect(t, colls)
|
||||
|
||||
for _, coll := range colls {
|
||||
assert.NotEqual(t, data.DeletedState, coll.State(), "metadata is never deleted")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(keepers): implement tree version of populateDriveCollections tests
|
||||
|
||||
// TODO(keepers): implement tree version of TestGet single-drive tests
|
||||
|
||||
func (suite *CollectionsTreeUnitSuite) TestCollections_MakeDriveCollections() {
|
||||
drive1 := models.NewDrive()
|
||||
drive1.SetId(ptr.To(idx(drive, 1)))
|
||||
drive1.SetName(ptr.To(namex(drive, 1)))
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
c *Collections
|
||||
drive models.Driveable
|
||||
prevPaths map[string]string
|
||||
expectErr require.ErrorAssertionFunc
|
||||
expectCounts countTD.Expected
|
||||
}{
|
||||
{
|
||||
name: "not yet implemented",
|
||||
c: collWithMBH(mock.DefaultOneDriveBH(user)),
|
||||
drive: drive1,
|
||||
expectErr: require.Error,
|
||||
expectCounts: countTD.Expected{
|
||||
count.PrevPaths: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
colls, paths, delta, err := test.c.makeDriveCollections(
|
||||
ctx,
|
||||
test.drive,
|
||||
test.prevPaths,
|
||||
test.c.counter,
|
||||
fault.New(true))
|
||||
|
||||
// TODO(keepers): awaiting implementation
|
||||
test.expectErr(t, err, clues.ToCore(err))
|
||||
assert.Empty(t, colls)
|
||||
assert.Empty(t, paths)
|
||||
assert.Empty(t, delta.URL)
|
||||
|
||||
test.expectCounts.Compare(t, test.c.counter)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(keepers): implement tree version of TestGet multi-drive tests
|
||||
|
||||
func (suite *CollectionsTreeUnitSuite) TestCollections_GetTree() {
|
||||
metadataPath, err := path.BuildMetadata(
|
||||
tenant,
|
||||
user,
|
||||
path.OneDriveService,
|
||||
path.FilesCategory,
|
||||
false)
|
||||
require.NoError(suite.T(), err, "making metadata path", clues.ToCore(err))
|
||||
|
||||
drive1 := models.NewDrive()
|
||||
drive1.SetId(ptr.To(idx(drive, 1)))
|
||||
drive1.SetName(ptr.To(namex(drive, 1)))
|
||||
|
||||
type expected struct {
|
||||
canUsePrevBackup assert.BoolAssertionFunc
|
||||
collAssertions collectionAssertions
|
||||
counts countTD.Expected
|
||||
deltas map[string]string
|
||||
err require.ErrorAssertionFunc
|
||||
prevPaths map[string]map[string]string
|
||||
skips int
|
||||
}
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
drivePager *apiMock.Pager[models.Driveable]
|
||||
enumerator mock.EnumerateItemsDeltaByDrive
|
||||
previousPaths map[string]map[string]string
|
||||
|
||||
metadata []data.RestoreCollection
|
||||
expect expected
|
||||
}{
|
||||
{
|
||||
name: "not yet implemented",
|
||||
drivePager: pagerForDrives(drive1),
|
||||
expect: expected{
|
||||
canUsePrevBackup: assert.False,
|
||||
collAssertions: collectionAssertions{
|
||||
fullPath(1): newCollAssertion(
|
||||
doNotMergeItems,
|
||||
statesToItemIDs{data.NotMovedState: {}},
|
||||
id(file)),
|
||||
},
|
||||
counts: countTD.Expected{
|
||||
count.PrevPaths: 0,
|
||||
},
|
||||
deltas: map[string]string{},
|
||||
err: require.Error,
|
||||
prevPaths: map[string]map[string]string{},
|
||||
skips: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
var (
|
||||
mbh = mock.DefaultDriveBHWith(user, test.drivePager, test.enumerator)
|
||||
c = collWithMBH(mbh)
|
||||
prevMetadata = makePrevMetadataColls(t, mbh, test.previousPaths)
|
||||
globalExcludes = prefixmatcher.NewStringSetBuilder()
|
||||
errs = fault.New(true)
|
||||
)
|
||||
|
||||
colls, canUsePrevBackup, err := c.getTree(
|
||||
ctx,
|
||||
prevMetadata,
|
||||
globalExcludes,
|
||||
errs)
|
||||
|
||||
test.expect.err(t, err, clues.ToCore(err))
|
||||
// TODO(keepers): awaiting implementation
|
||||
assert.Empty(t, colls)
|
||||
assert.Equal(t, test.expect.skips, len(errs.Skipped()))
|
||||
test.expect.canUsePrevBackup(t, canUsePrevBackup)
|
||||
test.expect.counts.Compare(t, c.counter)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, coll := range colls {
|
||||
collPath := fullOrPrevPath(t, coll)
|
||||
|
||||
if collPath.String() == metadataPath.String() {
|
||||
compareMetadata(
|
||||
t,
|
||||
coll,
|
||||
test.expect.deltas,
|
||||
test.expect.prevPaths)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
test.expect.collAssertions.compare(t, coll, globalExcludes)
|
||||
}
|
||||
|
||||
test.expect.collAssertions.requireNoUnseenCollections(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
95
src/internal/m365/collection/drive/limiter.go
Normal file
95
src/internal/m365/collection/drive/limiter.go
Normal file
@ -0,0 +1,95 @@
|
||||
package drive
|
||||
|
||||
import "github.com/alcionai/corso/src/pkg/control"
|
||||
|
||||
// used to mark an unused variable while we transition handling.
|
||||
const ignoreMe = -1
|
||||
|
||||
type driveEnumerationStats struct {
|
||||
numPages int
|
||||
numAddedFiles int
|
||||
numContainers int
|
||||
numBytes int64
|
||||
}
|
||||
|
||||
func newPagerLimiter(opts control.Options) *pagerLimiter {
|
||||
res := &pagerLimiter{limits: opts.PreviewLimits}
|
||||
|
||||
if res.limits.MaxContainers == 0 {
|
||||
res.limits.MaxContainers = defaultPreviewMaxContainers
|
||||
}
|
||||
|
||||
if res.limits.MaxItemsPerContainer == 0 {
|
||||
res.limits.MaxItemsPerContainer = defaultPreviewMaxItemsPerContainer
|
||||
}
|
||||
|
||||
if res.limits.MaxItems == 0 {
|
||||
res.limits.MaxItems = defaultPreviewMaxItems
|
||||
}
|
||||
|
||||
if res.limits.MaxBytes == 0 {
|
||||
res.limits.MaxBytes = defaultPreviewMaxBytes
|
||||
}
|
||||
|
||||
if res.limits.MaxPages == 0 {
|
||||
res.limits.MaxPages = defaultPreviewMaxPages
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
type pagerLimiter struct {
|
||||
limits control.PreviewItemLimits
|
||||
}
|
||||
|
||||
func (l pagerLimiter) effectiveLimits() control.PreviewItemLimits {
|
||||
return l.limits
|
||||
}
|
||||
|
||||
func (l pagerLimiter) enabled() bool {
|
||||
return l.limits.Enabled
|
||||
}
|
||||
|
||||
// sizeLimit returns the total number of bytes this backup should try to
|
||||
// contain.
|
||||
func (l pagerLimiter) sizeLimit() int64 {
|
||||
return l.limits.MaxBytes
|
||||
}
|
||||
|
||||
// atItemLimit returns true if the limiter is enabled and has reached the limit
|
||||
// for individual items added to collections for this backup.
|
||||
func (l pagerLimiter) atItemLimit(stats *driveEnumerationStats) bool {
|
||||
return l.enabled() &&
|
||||
(stats.numAddedFiles >= l.limits.MaxItems ||
|
||||
stats.numBytes >= l.limits.MaxBytes)
|
||||
}
|
||||
|
||||
// atContainerItemsLimit returns true if the limiter is enabled and the current
|
||||
// number of items is above the limit for the number of items for a container
|
||||
// for this backup.
|
||||
func (l pagerLimiter) atContainerItemsLimit(numItems int) bool {
|
||||
return l.enabled() && numItems >= l.limits.MaxItemsPerContainer
|
||||
}
|
||||
|
||||
// atContainerPageLimit returns true if the limiter is enabled and the number of
|
||||
// pages processed so far is beyond the limit for this backup.
|
||||
func (l pagerLimiter) atPageLimit(stats *driveEnumerationStats) bool {
|
||||
return l.enabled() && stats.numPages >= l.limits.MaxPages
|
||||
}
|
||||
|
||||
// atLimit returns true if the limiter is enabled and meets any of the
|
||||
// conditions for max items, containers, etc for this backup.
|
||||
func (l pagerLimiter) atLimit(
|
||||
stats *driveEnumerationStats,
|
||||
containerCount int,
|
||||
) bool {
|
||||
nc := stats.numContainers
|
||||
if nc == 0 && containerCount > 0 {
|
||||
nc = containerCount
|
||||
}
|
||||
|
||||
return l.enabled() &&
|
||||
(l.atItemLimit(stats) ||
|
||||
nc >= l.limits.MaxContainers ||
|
||||
stats.numPages >= l.limits.MaxPages)
|
||||
}
|
||||
1010
src/internal/m365/collection/drive/limiter_test.go
Normal file
1010
src/internal/m365/collection/drive/limiter_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@ -15,6 +15,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
apiMock "github.com/alcionai/corso/src/pkg/services/m365/api/mock"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/pagers"
|
||||
)
|
||||
|
||||
@ -106,6 +107,18 @@ func DefaultSharePointBH(resourceOwner string) *BackupHandler[models.DriveItemab
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultDriveBHWith(
|
||||
resource string,
|
||||
drivePager *apiMock.Pager[models.Driveable],
|
||||
enumerator EnumerateItemsDeltaByDrive,
|
||||
) *BackupHandler[models.DriveItemable] {
|
||||
mbh := DefaultOneDriveBH(resource)
|
||||
mbh.DrivePagerV = drivePager
|
||||
mbh.DriveItemEnumeration = enumerator
|
||||
|
||||
return mbh
|
||||
}
|
||||
|
||||
func (h BackupHandler[T]) PathPrefix(tID, driveID string) (path.Path, error) {
|
||||
pp, err := h.PathPrefixFn(tID, h.ProtectedResource.ID(), driveID)
|
||||
if err != nil {
|
||||
@ -121,7 +134,7 @@ func (h BackupHandler[T]) MetadataPathPrefix(tID string) (path.Path, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pp, h.PathPrefixErr
|
||||
return pp, h.MetadataPathPrefixErr
|
||||
}
|
||||
|
||||
func (h BackupHandler[T]) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) {
|
||||
|
||||
30
src/pkg/count/testdata/count.go
vendored
Normal file
30
src/pkg/count/testdata/count.go
vendored
Normal file
@ -0,0 +1,30 @@
|
||||
package testdata
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
)
|
||||
|
||||
type Expected map[count.Key]int64
|
||||
|
||||
func (e Expected) Compare(
|
||||
t *testing.T,
|
||||
bus *count.Bus,
|
||||
) {
|
||||
vs := bus.Values()
|
||||
results := map[count.Key]int64{}
|
||||
|
||||
for k := range e {
|
||||
results[k] = bus.Get(k)
|
||||
delete(vs, string(k))
|
||||
}
|
||||
|
||||
for k, v := range vs {
|
||||
t.Logf("unchecked count %q: %d", k, v)
|
||||
}
|
||||
|
||||
assert.Equal(t, e, Expected(results))
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user