add counter for skip-collision counting (#3722)

Introduces a counting bus, and threads it into restore operations so that we can count the number of
collision skips that occur.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🌻 Feature

#### Issue(s)

* #3562

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-07-06 13:22:02 -06:00 committed by GitHub
parent f58cc90958
commit 2a150cc610
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 670 additions and 185 deletions

View File

@ -1,18 +1,11 @@
package restore
import (
"github.com/alcionai/clues"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/alcionai/corso/src/cli/flags"
. "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/cli/repo"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/path"
)
// called by restore.go to map subcommands to provider-specific handling.
@ -94,33 +87,14 @@ func restoreExchangeCmd(cmd *cobra.Command, args []string) error {
return err
}
r, _, _, err := utils.GetAccountAndConnect(ctx, path.ExchangeService, repo.S3Overrides(cmd))
if err != nil {
return Only(ctx, err)
}
defer utils.CloseRepo(ctx, r)
restoreCfg := utils.MakeRestoreConfig(ctx, opts.RestoreCfg, dttm.HumanReadable)
sel := utils.IncludeExchangeRestoreDataSelectors(opts)
utils.FilterExchangeRestoreInfoSelectors(sel, opts)
ro, err := r.NewRestore(ctx, flags.BackupIDFV, sel.Selector, restoreCfg)
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to initialize Exchange restore"))
}
ds, err := ro.Run(ctx)
if err != nil {
if errors.Is(err, data.ErrNotFound) {
return Only(ctx, clues.New("Backup or backup details missing for id "+flags.BackupIDFV))
}
return Only(ctx, clues.Wrap(err, "Failed to run Exchange restore"))
}
ds.Items().MaybePrintEntries(ctx)
return nil
return runRestore(
ctx,
cmd,
opts.RestoreCfg,
sel.Selector,
flags.BackupIDFV,
"Exchange")
}

View File

@ -1,18 +1,12 @@
package restore
import (
"github.com/alcionai/clues"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/alcionai/corso/src/cli/flags"
. "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/cli/repo"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/path"
)
// called by restore.go to map subcommands to provider-specific handling.
@ -84,6 +78,7 @@ func restoreOneDriveCmd(cmd *cobra.Command, args []string) error {
}
opts := utils.MakeOneDriveOpts(cmd)
opts.RestoreCfg.DTTMFormat = dttm.HumanReadableDriveItem
if flags.RunModeFV == flags.RunModeFlagTest {
return nil
@ -93,33 +88,14 @@ func restoreOneDriveCmd(cmd *cobra.Command, args []string) error {
return err
}
r, _, _, err := utils.GetAccountAndConnect(ctx, path.OneDriveService, repo.S3Overrides(cmd))
if err != nil {
return Only(ctx, err)
}
defer utils.CloseRepo(ctx, r)
sel := utils.IncludeOneDriveRestoreDataSelectors(opts)
utils.FilterOneDriveRestoreInfoSelectors(sel, opts)
restoreCfg := utils.MakeRestoreConfig(ctx, opts.RestoreCfg, dttm.HumanReadableDriveItem)
ro, err := r.NewRestore(ctx, flags.BackupIDFV, sel.Selector, restoreCfg)
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to initialize OneDrive restore"))
}
ds, err := ro.Run(ctx)
if err != nil {
if errors.Is(err, data.ErrNotFound) {
return Only(ctx, clues.New("Backup or backup details missing for id "+flags.BackupIDFV))
}
return Only(ctx, clues.Wrap(err, "Failed to run OneDrive restore"))
}
ds.Items().MaybePrintEntries(ctx)
return nil
return runRestore(
ctx,
cmd,
opts.RestoreCfg,
sel.Selector,
flags.BackupIDFV,
"OneDrive")
}

View File

@ -1,7 +1,19 @@
package restore
import (
"context"
"github.com/alcionai/clues"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/alcionai/corso/src/cli/flags"
. "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/cli/repo"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/selectors"
)
var restoreCommands = []func(cmd *cobra.Command) *cobra.Command{
@ -39,3 +51,47 @@ func restoreCmd() *cobra.Command {
func handleRestoreCmd(cmd *cobra.Command, args []string) error {
return cmd.Help()
}
// ---------------------------------------------------------------------------
// common handlers
// ---------------------------------------------------------------------------
func runRestore(
ctx context.Context,
cmd *cobra.Command,
urco utils.RestoreCfgOpts,
sel selectors.Selector,
backupID, serviceName string,
) error {
r, _, _, err := utils.GetAccountAndConnect(ctx, sel.PathService(), repo.S3Overrides(cmd))
if err != nil {
return Only(ctx, err)
}
defer utils.CloseRepo(ctx, r)
ro, err := r.NewRestore(ctx, backupID, sel, utils.MakeRestoreConfig(ctx, urco))
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to initialize "+serviceName+" restore"))
}
ds, err := ro.Run(ctx)
if err != nil {
if errors.Is(err, data.ErrNotFound) {
return Only(ctx, clues.New("Backup or backup details missing for id "+flags.BackupIDFV))
}
return Only(ctx, clues.Wrap(err, "Failed to run "+serviceName+" restore"))
}
Info(ctx, "Completed Restore:")
skipped := ro.Counter.Get(count.CollisionSkip)
if skipped > 0 {
Infof(ctx, "Skipped %d items due to collision", skipped)
}
ds.Items().MaybePrintEntries(ctx)
return nil
}

View File

@ -1,18 +1,12 @@
package restore
import (
"github.com/alcionai/clues"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/alcionai/corso/src/cli/flags"
. "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/cli/repo"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/path"
)
// called by restore.go to map subcommands to provider-specific handling.
@ -90,6 +84,7 @@ func restoreSharePointCmd(cmd *cobra.Command, args []string) error {
}
opts := utils.MakeSharePointOpts(cmd)
opts.RestoreCfg.DTTMFormat = dttm.HumanReadableDriveItem
if flags.RunModeFV == flags.RunModeFlagTest {
return nil
@ -99,33 +94,14 @@ func restoreSharePointCmd(cmd *cobra.Command, args []string) error {
return err
}
r, _, _, err := utils.GetAccountAndConnect(ctx, path.SharePointService, repo.S3Overrides(cmd))
if err != nil {
return Only(ctx, err)
}
defer utils.CloseRepo(ctx, r)
sel := utils.IncludeSharePointRestoreDataSelectors(ctx, opts)
utils.FilterSharePointRestoreInfoSelectors(sel, opts)
restoreCfg := utils.MakeRestoreConfig(ctx, opts.RestoreCfg, dttm.HumanReadableDriveItem)
ro, err := r.NewRestore(ctx, flags.BackupIDFV, sel.Selector, restoreCfg)
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to initialize SharePoint restore"))
}
ds, err := ro.Run(ctx)
if err != nil {
if errors.Is(err, data.ErrNotFound) {
return Only(ctx, clues.New("Backup or backup details missing for id "+flags.BackupIDFV))
}
return Only(ctx, clues.Wrap(err, "Failed to run SharePoint restore"))
}
ds.Items().MaybePrintEntries(ctx)
return nil
return runRestore(
ctx,
cmd,
opts.RestoreCfg,
sel.Selector,
flags.BackupIDFV,
"SharePoint")
}

View File

@ -15,6 +15,10 @@ import (
type RestoreCfgOpts struct {
Collisions string
Destination string
// DTTMFormat is the timestamp format appended
// to the default folder name. Defaults to
// dttm.HumanReadable.
DTTMFormat dttm.TimeFormat
Populated flags.PopulatedFlags
}
@ -23,6 +27,7 @@ func makeRestoreCfgOpts(cmd *cobra.Command) RestoreCfgOpts {
return RestoreCfgOpts{
Collisions: flags.CollisionsFV,
Destination: flags.DestinationFV,
DTTMFormat: dttm.HumanReadable,
// populated contains the list of flags that appear in the
// command, according to pflags. Use this to differentiate
@ -47,9 +52,12 @@ func validateRestoreConfigFlags(fv string, opts RestoreCfgOpts) error {
func MakeRestoreConfig(
ctx context.Context,
opts RestoreCfgOpts,
locationTimeFormat dttm.TimeFormat,
) control.RestoreConfig {
restoreCfg := control.DefaultRestoreConfig(locationTimeFormat)
if len(opts.DTTMFormat) == 0 {
opts.DTTMFormat = dttm.HumanReadable
}
restoreCfg := control.DefaultRestoreConfig(opts.DTTMFormat)
if _, ok := opts.Populated[flags.CollisionsFN]; ok {
restoreCfg.OnCollision = control.CollisionPolicy(opts.Collisions)

View File

@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/cli/flags"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control"
)
@ -129,7 +128,7 @@ func (suite *RestoreCfgUnitSuite) TestMakeRestoreConfig() {
opts := *rco
opts.Populated = test.populated
result := MakeRestoreConfig(ctx, opts, dttm.HumanReadable)
result := MakeRestoreConfig(ctx, opts)
assert.Equal(t, test.expect.OnCollision, result.OnCollision)
assert.Contains(t, result.Location, test.expect.Location)
})

View File

@ -71,8 +71,8 @@ func AccountConnectAndWriteRepoConfig(
return nil, nil, err
}
// repo config is already set while repo connect and init. This is just to confirm correct values.
// So won't fail is the write fails
// repo config gets set during repo connect and init.
// This call confirms we have the correct values.
err = config.WriteRepoConfig(ctx, s3Config, m365Config, r.GetID())
if err != nil {
logger.CtxErr(ctx, err).Info("writing to repository configuration")

View File

@ -27,6 +27,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/credentials"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
@ -63,6 +64,7 @@ func generateAndRestoreItems(
dbf dataBuilderFunc,
opts control.Options,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, error) {
items := make([]item, 0, howMany)
@ -102,7 +104,7 @@ func generateAndRestoreItems(
print.Infof(ctx, "Generating %d %s items in %s\n", howMany, cat, Destination)
return ctrl.ConsumeRestoreCollections(ctx, version.Backup, sel, restoreCfg, opts, dataColls, errs)
return ctrl.ConsumeRestoreCollections(ctx, version.Backup, sel, restoreCfg, opts, dataColls, errs, ctr)
}
// ------------------------------------------------------------------------------------------
@ -220,8 +222,9 @@ func generateAndRestoreDriveItems(
cat path.CategoryType,
sel selectors.Selector,
tenantID, destFldr string,
count int,
intCount int,
errs *fault.Bus,
ctr *count.Bus,
) (
*details.Details,
error,
@ -266,7 +269,7 @@ func generateAndRestoreDriveItems(
currentTime = fmt.Sprintf("%d-%v-%d-%d-%d-%d", year, mnth, date, hour, min, sec)
)
for i := 0; i < count; i++ {
for i := 0; i < intCount; i++ {
col := []odStub.ColInfo{
// basic folder and file creation
{
@ -426,5 +429,5 @@ func generateAndRestoreDriveItems(
return nil, err
}
return ctrl.ConsumeRestoreCollections(ctx, version.Backup, sel, restoreCfg, opts, collections, errs)
return ctrl.ConsumeRestoreCollections(ctx, version.Backup, sel, restoreCfg, opts, collections, errs, ctr)
}

View File

@ -8,6 +8,7 @@ import (
exchMock "github.com/alcionai/corso/src/internal/m365/exchange/mock"
"github.com/alcionai/corso/src/internal/m365/resource"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -72,7 +73,8 @@ func handleExchangeEmailFactory(cmd *cobra.Command, args []string) error {
now, now, now, now)
},
control.Defaults(),
errs)
errs,
count.New())
if err != nil {
return Only(ctx, err)
}
@ -120,7 +122,8 @@ func handleExchangeCalendarEventFactory(cmd *cobra.Command, args []string) error
exchMock.NoExceptionOccurrences)
},
control.Defaults(),
errs)
errs,
count.New())
if err != nil {
return Only(ctx, err)
}
@ -170,7 +173,8 @@ func handleExchangeContactFactory(cmd *cobra.Command, args []string) error {
)
},
control.Defaults(),
errs)
errs,
count.New())
if err != nil {
return Only(ctx, err)
}

View File

@ -8,6 +8,7 @@ import (
. "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/m365/resource"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -56,7 +57,8 @@ func handleOneDriveFileFactory(cmd *cobra.Command, args []string) error {
Tenant,
Destination,
Count,
errs)
errs,
count.New())
if err != nil {
return Only(ctx, err)
}

View File

@ -8,6 +8,7 @@ import (
. "github.com/alcionai/corso/src/cli/print"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/m365/resource"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -56,7 +57,8 @@ func handleSharePointLibraryFileFactory(cmd *cobra.Command, args []string) error
Tenant,
Destination,
Count,
errs)
errs,
count.New())
if err != nil {
return Only(ctx, err)
}

View File

@ -22,6 +22,7 @@ require (
github.com/microsoftgraph/msgraph-sdk-go v1.4.0
github.com/microsoftgraph/msgraph-sdk-go-core v1.0.0
github.com/pkg/errors v0.9.1
github.com/puzpuzpuz/xsync/v2 v2.4.1
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1
github.com/spf13/cobra v1.7.0

View File

@ -345,6 +345,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/puzpuzpuz/xsync/v2 v2.4.1 h1:aGdE1C/HaR/QC6YAFdtZXi60Df8/qBIrs8PKrzkItcM=
github.com/puzpuzpuz/xsync/v2 v2.4.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=

View File

@ -385,7 +385,7 @@ func (w *conn) writePolicy(
ctx = clues.Add(ctx, "source_info", si)
writeOpts := repo.WriteSessionOptions{Purpose: purpose}
cb := func(innerCtx context.Context, rw repo.RepositoryWriter) error {
ctr := func(innerCtx context.Context, rw repo.RepositoryWriter) error {
if err := policy.SetPolicy(ctx, rw, si, p); err != nil {
return clues.Stack(err).WithClues(innerCtx)
}
@ -393,7 +393,7 @@ func (w *conn) writePolicy(
return nil
}
if err := repo.WriteSession(ctx, w.Repository, writeOpts, cb); err != nil {
if err := repo.WriteSession(ctx, w.Repository, writeOpts, ctr); err != nil {
return clues.Wrap(err, "updating policy").WithClues(ctx)
}

View File

@ -506,11 +506,11 @@ func (ms *ModelStore) DeleteWithModelStoreID(ctx context.Context, id manifest.ID
}
opts := repo.WriteSessionOptions{Purpose: "ModelStoreDelete"}
cb := func(innerCtx context.Context, w repo.RepositoryWriter) error {
ctr := func(innerCtx context.Context, w repo.RepositoryWriter) error {
return w.DeleteManifest(innerCtx, id)
}
if err := repo.WriteSession(ctx, ms.c, opts, cb); err != nil {
if err := repo.WriteSession(ctx, ms.c, opts, ctr); err != nil {
return clues.Wrap(err, "deleting model").WithClues(ctx)
}

View File

@ -302,7 +302,7 @@ func (cp *corsoProgress) get(k string) *itemDetails {
func collectionEntries(
ctx context.Context,
cb func(context.Context, fs.Entry) error,
ctr func(context.Context, fs.Entry) error,
streamedEnts data.BackupCollection,
progress *corsoProgress,
) (map[string]struct{}, error) {
@ -399,7 +399,7 @@ func collectionEntries(
modTime,
newBackupStreamReader(serializationVersion, e.ToReader()))
err = cb(ctx, entry)
err = ctr(ctx, entry)
if err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
@ -411,7 +411,7 @@ func collectionEntries(
func streamBaseEntries(
ctx context.Context,
cb func(context.Context, fs.Entry) error,
ctr func(context.Context, fs.Entry) error,
curPath path.Path,
prevPath path.Path,
locationPath *path.Builder,
@ -501,7 +501,7 @@ func streamBaseEntries(
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
if err := cb(ctx, entry); err != nil {
if err := ctr(ctx, entry); err != nil {
return clues.Wrap(err, "executing callback on item").With("item_path", itemPath)
}
@ -527,13 +527,13 @@ func getStreamItemFunc(
globalExcludeSet prefixmatcher.StringSetReader,
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, ctr func(context.Context, fs.Entry) error) error {
ctx, end := diagnostics.Span(ctx, "kopia:getStreamItemFunc")
defer end()
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := cb(ctx, d); err != nil {
if err := ctr(ctx, d); err != nil {
return clues.Wrap(err, "executing callback on static directory").WithClues(ctx)
}
}
@ -544,14 +544,14 @@ func getStreamItemFunc(
locationPath = lp.LocationPath()
}
seen, err := collectionEntries(ctx, cb, streamedEnts, progress)
seen, err := collectionEntries(ctx, ctr, streamedEnts, progress)
if err != nil {
return clues.Wrap(err, "streaming collection entries")
}
if err := streamBaseEntries(
ctx,
cb,
ctr,
curPath,
prevPath,
locationPath,

View File

@ -24,6 +24,7 @@ import (
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -314,7 +315,8 @@ func (suite *ControllerIntegrationSuite) TestRestoreFailsBadService() {
ToggleFeatures: control.Toggles{},
},
nil,
fault.New(true))
fault.New(true),
count.New())
assert.Error(t, err, clues.ToCore(err))
assert.NotNil(t, deets)
@ -392,7 +394,8 @@ func (suite *ControllerIntegrationSuite) TestEmptyCollections() {
ToggleFeatures: control.Toggles{},
},
test.col,
fault.New(true))
fault.New(true),
count.New())
require.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, deets)
@ -432,7 +435,8 @@ func runRestore(
sci.RestoreCfg,
sci.Opts,
collections,
fault.New(true))
fault.New(true),
count.New())
require.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, deets)
@ -1042,7 +1046,8 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() {
ToggleFeatures: control.Toggles{},
},
collections,
fault.New(true))
fault.New(true),
count.New())
require.NoError(t, err, clues.ToCore(err))
require.NotNil(t, deets)

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -71,6 +72,7 @@ func (h contactRestoreHandler) restore(
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
errs *fault.Bus,
ctr *count.Bus,
) (*details.ExchangeInfo, error) {
return restoreContact(
ctx,
@ -79,7 +81,8 @@ func (h contactRestoreHandler) restore(
userID, destinationID,
collisionKeyToItemID,
collisionPolicy,
errs)
errs,
ctr)
}
type contactRestorer interface {
@ -95,6 +98,7 @@ func restoreContact(
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
errs *fault.Bus,
ctr *count.Bus,
) (*details.ExchangeInfo, error) {
contact, err := api.BytesToContactable(body)
if err != nil {
@ -114,7 +118,9 @@ func restoreContact(
log.Debug("item collision")
if collisionPolicy == control.Skip {
ctr.Inc(count.CollisionSkip)
log.Debug("skipping item with collision")
return nil, graph.ErrItemAlreadyExistsConflict
}

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -191,7 +192,8 @@ func (suite *ContactsRestoreIntgSuite) TestRestoreContact() {
"destination",
test.collisionMap,
test.onCollision,
fault.New(true))
fault.New(true),
count.New())
test.expectErr(t, err)
test.expectMock(t, test.apiMock)

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -72,6 +73,7 @@ func (h eventRestoreHandler) restore(
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
errs *fault.Bus,
ctr *count.Bus,
) (*details.ExchangeInfo, error) {
return restoreEvent(
ctx,
@ -80,7 +82,8 @@ func (h eventRestoreHandler) restore(
userID, destinationID,
collisionKeyToItemID,
collisionPolicy,
errs)
errs,
ctr)
}
type eventRestorer interface {
@ -96,6 +99,7 @@ func restoreEvent(
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
errs *fault.Bus,
ctr *count.Bus,
) (*details.ExchangeInfo, error) {
event, err := api.BytesToEventable(body)
if err != nil {
@ -115,7 +119,9 @@ func restoreEvent(
log.Debug("item collision")
if collisionPolicy == control.Skip {
ctr.Inc(count.CollisionSkip)
log.Debug("skipping item with collision")
return nil, graph.ErrItemAlreadyExistsConflict
}

View File

@ -17,6 +17,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -239,7 +240,8 @@ func (suite *EventsRestoreIntgSuite) TestRestoreEvent() {
"destination",
test.collisionMap,
test.onCollision,
fault.New(true))
fault.New(true),
count.New())
test.expectErr(t, err)
test.expectMock(t, test.apiMock)

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -80,6 +81,7 @@ type itemRestorer interface {
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
errs *fault.Bus,
ctr *count.Bus,
) (*details.ExchangeInfo, error)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -76,6 +77,7 @@ func (h mailRestoreHandler) restore(
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
errs *fault.Bus,
ctr *count.Bus,
) (*details.ExchangeInfo, error) {
return restoreMail(
ctx,
@ -84,7 +86,8 @@ func (h mailRestoreHandler) restore(
userID, destinationID,
collisionKeyToItemID,
collisionPolicy,
errs)
errs,
ctr)
}
type mailRestorer interface {
@ -101,6 +104,7 @@ func restoreMail(
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
errs *fault.Bus,
ctr *count.Bus,
) (*details.ExchangeInfo, error) {
msg, err := api.BytesToMessageable(body)
if err != nil {
@ -120,7 +124,9 @@ func restoreMail(
log.Debug("item collision")
if collisionPolicy == control.Skip {
ctr.Inc(count.CollisionSkip)
log.Debug("skipping item with collision")
return nil, graph.ErrItemAlreadyExistsConflict
}

View File

@ -17,6 +17,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -208,7 +209,8 @@ func (suite *MailRestoreIntgSuite) TestRestoreMail() {
"destination",
test.collisionMap,
test.onCollision,
fault.New(true))
fault.New(true),
count.New())
test.expectErr(t, err)
test.expectMock(t, test.apiMock)

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -31,6 +32,7 @@ func ConsumeRestoreCollections(
dcs []data.RestoreCollection,
deets *details.Builder,
errs *fault.Bus,
ctr *count.Bus,
) (*support.ControllerOperationStatus, error) {
if len(dcs) == 0 {
return support.CreateStatus(ctx, support.Restore, 0, support.CollectionMetrics{}, ""), nil
@ -103,7 +105,8 @@ func ConsumeRestoreCollections(
collisionKeyToItemID,
restoreCfg.OnCollision,
deets,
errs)
errs,
ctr.Local())
metrics = support.CombineMetrics(metrics, temp)
@ -136,6 +139,7 @@ func restoreCollection(
collisionPolicy control.CollisionPolicy,
deets *details.Builder,
errs *fault.Bus,
ctr *count.Bus,
) (support.CollectionMetrics, error) {
ctx, end := diagnostics.Span(ctx, "m365:exchange:restoreCollection", diagnostics.Label("path", dc.FullPath()))
defer end()
@ -185,7 +189,8 @@ func restoreCollection(
destinationID,
collisionKeyToItemID,
collisionPolicy,
errs)
errs,
ctr)
if err != nil {
if !graph.IsErrItemAlreadyExistsConflict(err) {
el.AddRecoverable(ictx, err)

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -78,7 +79,8 @@ func (suite *RestoreIntgSuite) TestRestoreContact() {
userID, folderID,
nil,
control.Copy,
fault.New(true))
fault.New(true),
count.New())
assert.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, info, "contact item info")
}
@ -152,7 +154,8 @@ func (suite *RestoreIntgSuite) TestRestoreEvent() {
userID, calendarID,
nil,
control.Copy,
fault.New(true))
fault.New(true),
count.New())
assert.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, info, "event item info")
})
@ -380,7 +383,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
userID, destination,
nil,
control.Copy,
fault.New(true))
fault.New(true),
count.New())
assert.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, info, "item info was not populated")
})
@ -413,7 +417,8 @@ func (suite *RestoreIntgSuite) TestRestoreAndBackupEvent_recurringInstancesWithA
userID, calendarID,
nil,
control.Copy,
fault.New(true))
fault.New(true),
count.New())
require.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, info, "event item info")

View File

@ -384,22 +384,22 @@ func (suite *GraphErrorsUnitSuite) TestIsErrUnauthorized() {
func (suite *GraphErrorsUnitSuite) TestMalwareInfo() {
var (
i = models.NewDriveItem()
cb = models.NewUser()
cbID = "created-by"
lm = models.NewUser()
lmID = "last-mod-by"
ref = models.NewItemReference()
refCID = "container-id"
refCN = "container-name"
refCP = "/drives/b!vF-sdsdsds-sdsdsa-sdsd/root:/Folder/container-name"
refCPexp = "/Folder/container-name"
mal = models.NewMalware()
malDesc = "malware-description"
i = models.NewDriveItem()
createdBy = models.NewUser()
cbID = "created-by"
lm = models.NewUser()
lmID = "last-mod-by"
ref = models.NewItemReference()
refCID = "container-id"
refCN = "container-name"
refCP = "/drives/b!vF-sdsdsds-sdsdsa-sdsd/root:/Folder/container-name"
refCPexp = "/Folder/container-name"
mal = models.NewMalware()
malDesc = "malware-description"
)
cb.SetId(&cbID)
i.SetCreatedByUser(cb)
createdBy.SetId(&cbID)
i.SetCreatedByUser(createdBy)
lm.SetId(&lmID)
i.SetLastModifiedByUser(lm)

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -64,6 +65,7 @@ func (ctrl Controller) ConsumeRestoreCollections(
_ control.Options,
_ []data.RestoreCollection,
_ *fault.Bus,
_ *count.Bus,
) (*details.Details, error) {
return ctrl.Deets, ctrl.Err
}

View File

@ -25,6 +25,7 @@ import (
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -75,6 +76,7 @@ func ConsumeRestoreCollections(
dcs []data.RestoreCollection,
deets *details.Builder,
errs *fault.Bus,
ctr *count.Bus,
) (*support.ControllerOperationStatus, error) {
var (
restoreMetrics support.CollectionMetrics
@ -113,7 +115,8 @@ func ConsumeRestoreCollections(
caches,
deets,
opts.RestorePermissions,
errs)
errs,
ctr.Local())
if err != nil {
el.AddRecoverable(ctx, err)
}
@ -150,6 +153,7 @@ func RestoreCollection(
deets *details.Builder,
restorePerms bool, // TODD: move into restoreConfig
errs *fault.Bus,
ctr *count.Bus,
) (support.CollectionMetrics, error) {
var (
metrics = support.CollectionMetrics{}
@ -303,7 +307,8 @@ func RestoreCollection(
caches,
restorePerms,
itemData,
itemPath)
itemPath,
ctr)
// skipped items don't get counted, but they can error
if !skipped {
@ -353,6 +358,7 @@ func restoreItem(
restorePerms bool,
itemData data.Stream,
itemPath path.Path,
ctr *count.Bus,
) (details.ItemInfo, bool, error) {
itemUUID := itemData.UUID()
ctx = clues.Add(ctx, "item_id", itemUUID)
@ -367,7 +373,8 @@ func restoreItem(
restoreFolderID,
copyBuffer,
caches.collisionKeyToItemID,
itemData)
itemData,
ctr)
if err != nil {
if errors.Is(err, graph.ErrItemAlreadyExistsConflict) && restoreCfg.OnCollision == control.Skip {
return details.ItemInfo{}, true, nil
@ -424,7 +431,8 @@ func restoreItem(
restorePerms,
caches,
itemPath,
itemData)
itemData,
ctr)
if err != nil {
if errors.Is(err, graph.ErrItemAlreadyExistsConflict) && restoreCfg.OnCollision == control.Skip {
return details.ItemInfo{}, true, nil
@ -449,7 +457,8 @@ func restoreItem(
restorePerms,
caches,
itemPath,
itemData)
itemData,
ctr)
if err != nil {
if errors.Is(err, graph.ErrItemAlreadyExistsConflict) && restoreCfg.OnCollision == control.Skip {
return details.ItemInfo{}, true, nil
@ -471,6 +480,7 @@ func restoreV0File(
copyBuffer []byte,
collisionKeyToItemID map[string]api.DriveCollisionItem,
itemData data.Stream,
ctr *count.Bus,
) (details.ItemInfo, error) {
_, itemInfo, err := restoreFile(
ctx,
@ -482,7 +492,8 @@ func restoreV0File(
drivePath.DriveID,
restoreFolderID,
collisionKeyToItemID,
copyBuffer)
copyBuffer,
ctr)
if err != nil {
return itemInfo, clues.Wrap(err, "restoring file")
}
@ -502,6 +513,7 @@ func restoreV1File(
caches *restoreCaches,
itemPath path.Path,
itemData data.Stream,
ctr *count.Bus,
) (details.ItemInfo, error) {
trimmedName := strings.TrimSuffix(itemData.UUID(), metadata.DataFileSuffix)
@ -515,7 +527,8 @@ func restoreV1File(
drivePath.DriveID,
restoreFolderID,
caches.collisionKeyToItemID,
copyBuffer)
copyBuffer,
ctr)
if err != nil {
return details.ItemInfo{}, err
}
@ -561,6 +574,7 @@ func restoreV6File(
caches *restoreCaches,
itemPath path.Path,
itemData data.Stream,
ctr *count.Bus,
) (details.ItemInfo, error) {
trimmedName := strings.TrimSuffix(itemData.UUID(), metadata.DataFileSuffix)
@ -598,7 +612,8 @@ func restoreV6File(
drivePath.DriveID,
restoreFolderID,
caches.collisionKeyToItemID,
copyBuffer)
copyBuffer,
ctr)
if err != nil {
return details.ItemInfo{}, err
}
@ -795,6 +810,7 @@ func restoreFile(
driveID, parentFolderID string,
collisionKeyToItemID map[string]api.DriveCollisionItem,
copyBuffer []byte,
ctr *count.Bus,
) (string, details.ItemInfo, error) {
ctx, end := diagnostics.Span(ctx, "gc:oneDrive:restoreItem", diagnostics.Label("item_uuid", itemData.UUID()))
defer end()
@ -819,7 +835,9 @@ func restoreFile(
log.Debug("item collision")
if restoreCfg.OnCollision == control.Skip {
ctr.Inc(count.CollisionSkip)
log.Debug("skipping item with collision")
return "", details.ItemInfo{}, graph.ErrItemAlreadyExistsConflict
}

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -492,7 +493,8 @@ func (suite *RestoreUnitSuite) TestRestoreItem_collisionHandling() {
ID: uuid.NewString(),
Reader: mock.FileRespReadCloser(mock.DriveFilePayloadData),
},
nil)
nil,
count.New())
require.NoError(t, err, clues.ToCore(err))
test.expectSkipped(t, skip)
test.expectMock(t, rh)

View File

@ -14,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/selectors"
)
@ -29,6 +30,7 @@ func (ctrl *Controller) ConsumeRestoreCollections(
opts control.Options,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, error) {
ctx, end := diagnostics.Span(ctx, "m365:restore")
defer end()
@ -44,7 +46,7 @@ func (ctrl *Controller) ConsumeRestoreCollections(
switch sels.Service {
case selectors.ServiceExchange:
status, err = exchange.ConsumeRestoreCollections(ctx, ctrl.AC, restoreCfg, dcs, deets, errs)
status, err = exchange.ConsumeRestoreCollections(ctx, ctrl.AC, restoreCfg, dcs, deets, errs, ctr)
case selectors.ServiceOneDrive:
status, err = onedrive.ConsumeRestoreCollections(
ctx,
@ -54,7 +56,8 @@ func (ctrl *Controller) ConsumeRestoreCollections(
opts,
dcs,
deets,
errs)
errs,
ctr)
case selectors.ServiceSharePoint:
status, err = sharepoint.ConsumeRestoreCollections(
ctx,
@ -64,7 +67,8 @@ func (ctrl *Controller) ConsumeRestoreCollections(
opts,
dcs,
deets,
errs)
errs,
ctr)
default:
err = clues.Wrap(clues.New(sels.Service.String()), "service not supported")
}

View File

@ -19,6 +19,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -35,6 +36,7 @@ func ConsumeRestoreCollections(
dcs []data.RestoreCollection,
deets *details.Builder,
errs *fault.Bus,
ctr *count.Bus,
) (*support.ControllerOperationStatus, error) {
var (
restoreMetrics support.CollectionMetrics
@ -74,7 +76,8 @@ func ConsumeRestoreCollections(
caches,
deets,
opts.RestorePermissions,
errs)
errs,
ctr)
case path.ListsCategory:
metrics, err = RestoreListCollection(

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -41,6 +42,7 @@ type (
opts control.Options,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, error)
Wait() *data.CollectionStats

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/store"
)
@ -49,7 +50,8 @@ const (
type operation struct {
CreatedAt time.Time `json:"createdAt"`
Errors *fault.Bus `json:"errors"`
Errors *fault.Bus `json:"errors"`
Counter *count.Bus
Options control.Options `json:"options"`
Status OpStatus `json:"status"`
@ -67,6 +69,7 @@ func newOperation(
return operation{
CreatedAt: time.Now(),
Errors: fault.New(opts.FailureHandling == control.FailFast),
Counter: count.New(),
Options: opts,
bus: bus,

View File

@ -25,6 +25,7 @@ import (
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -178,6 +179,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
finalizeErrorHandling(ctx, op.Options, op.Errors, "running restore")
LogFaultErrors(ctx, op.Errors.Errors(), "running restore")
logger.Ctx(ctx).With("total_counts", op.Counter.Values()).Info("restore stats")
// -----
// Persistence
@ -266,7 +268,8 @@ func (op *RestoreOperation) do(
op.RestoreCfg,
op.Options,
dcs,
op.Errors)
op.Errors,
op.Counter)
if err != nil {
return nil, clues.Wrap(err, "restoring collections")
}
@ -324,6 +327,7 @@ func consumeRestoreCollections(
opts control.Options,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, error) {
complete := observe.MessageWithCompletion(ctx, "Restoring data")
defer func() {
@ -338,7 +342,8 @@ func consumeRestoreCollections(
restoreCfg,
opts,
dcs,
errs)
errs,
ctr)
if err != nil {
return nil, clues.Wrap(err, "restoring collections")
}

View File

@ -33,6 +33,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -425,7 +426,8 @@ func generateContainerOfItems(
restoreCfg,
opts,
dataColls,
fault.New(true))
fault.New(true),
count.New())
require.NoError(t, err, clues.ToCore(err))
// have to wait here, both to ensure the process

108
src/pkg/count/count.go Normal file
View File

@ -0,0 +1,108 @@
package count
import (
"github.com/puzpuzpuz/xsync/v2"
)
// Bus handles threadsafe counting of arbitrarily keyed metrics.
type Bus struct {
parent *Bus
stats *xsync.MapOf[string, *xsync.Counter]
}
func New() *Bus {
return &Bus{
stats: xsync.NewMapOf[*xsync.Counter](),
}
}
// Local generates a bus with a parent link. Any value added to
// the local instance also updates the parent by the same increment.
// This allows you to maintain an isolated set of counts for a
// bounded context while automatically tallying the global total.
func (b *Bus) Local() *Bus {
bus := New()
bus.parent = b
return bus
}
func (b *Bus) getCounter(k key) *xsync.Counter {
xc, _ := b.stats.LoadOrStore(string(k), xsync.NewCounter())
return xc
}
// Inc increases the count by 1.
func (b *Bus) Inc(k key) {
if b == nil {
return
}
b.Add(k, 1)
}
// Inc increases the count by n.
func (b *Bus) Add(k key, n int64) {
if b == nil {
return
}
b.getCounter(k).Add(n)
if b.parent != nil {
b.parent.Add(k, n)
}
}
// Get returns the local count.
func (b *Bus) Get(k key) int64 {
if b == nil {
return -1
}
return b.getCounter(k).Value()
}
// Total returns the global count.
func (b *Bus) Total(k key) int64 {
if b == nil {
return -1
}
if b.parent != nil {
return b.parent.Total(k)
}
return b.Get(k)
}
// Values returns a map of all local values.
// Not a snapshot, and therefore not threadsafe.
func (b *Bus) Values() map[string]int64 {
if b == nil {
return map[string]int64{}
}
m := make(map[string]int64, b.stats.Size())
b.stats.Range(func(k string, v *xsync.Counter) bool {
m[k] = v.Value()
return true
})
return m
}
// TotalValues returns a map of all global values.
// Not a snapshot, and therefore not threadsafe.
func (b *Bus) TotalValues() map[string]int64 {
if b == nil {
return map[string]int64{}
}
if b.parent != nil {
return b.parent.TotalValues()
}
return b.Values()
}

180
src/pkg/count/count_test.go Normal file
View File

@ -0,0 +1,180 @@
package count
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
)
type CountUnitSuite struct {
tester.Suite
}
func TestCountUnitSuite(t *testing.T) {
suite.Run(t, &CountUnitSuite{Suite: tester.NewUnitSuite(t)})
}
const testKey = key("just-for-testing")
func (suite *CountUnitSuite) TestBus_Inc() {
newParent := func() *Bus {
parent := New()
parent.Inc(testKey)
return parent
}
table := []struct {
name string
skip bool
bus *Bus
expect int64
expectTotal int64
}{
{
name: "nil",
bus: nil,
expect: -1,
expectTotal: -1,
},
{
name: "none",
skip: true,
bus: newParent().Local(),
expect: 0,
expectTotal: 1,
},
{
name: "one",
bus: newParent().Local(),
expect: 1,
expectTotal: 2,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
if !test.skip {
test.bus.Inc(testKey)
}
result := test.bus.Get(testKey)
assert.Equal(t, test.expect, result)
resultTotal := test.bus.Total(testKey)
assert.Equal(t, test.expectTotal, resultTotal)
})
}
}
func (suite *CountUnitSuite) TestBus_Add() {
newParent := func() *Bus {
parent := New()
parent.Add(testKey, 2)
return parent
}
table := []struct {
name string
skip bool
bus *Bus
expect int64
expectTotal int64
}{
{
name: "nil",
bus: nil,
expect: -1,
expectTotal: -1,
},
{
name: "none",
skip: true,
bus: newParent().Local(),
expect: 0,
expectTotal: 2,
},
{
name: "some",
bus: newParent().Local(),
expect: 4,
expectTotal: 6,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
if !test.skip {
test.bus.Add(testKey, 4)
}
result := test.bus.Get(testKey)
assert.Equal(t, test.expect, result)
resultTotal := test.bus.Total(testKey)
assert.Equal(t, test.expectTotal, resultTotal)
})
}
}
func (suite *CountUnitSuite) TestBus_Values() {
table := []struct {
name string
bus func() *Bus
expect map[string]int64
expectTotal map[string]int64
}{
{
name: "nil",
bus: func() *Bus { return nil },
expect: map[string]int64{},
expectTotal: map[string]int64{},
},
{
name: "none",
bus: func() *Bus {
parent := New()
parent.Add(testKey, 2)
l := parent.Local()
return l
},
expect: map[string]int64{},
expectTotal: map[string]int64{string(testKey): 2},
},
{
name: "some",
bus: func() *Bus {
parent := New()
parent.Add(testKey, 2)
l := parent.Local()
l.Inc(testKey)
return l
},
expect: map[string]int64{string(testKey): 1},
expectTotal: map[string]int64{string(testKey): 3},
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
b := test.bus()
result := b.Values()
assert.Equal(t, test.expect, result)
resultTotal := b.TotalValues()
assert.Equal(t, test.expectTotal, resultTotal)
})
}
}

7
src/pkg/count/keys.go Normal file
View File

@ -0,0 +1,7 @@
package count
type key string
const (
CollisionSkip key = "collision-skip"
)

View File

@ -22,8 +22,12 @@ const (
ccRecipients = "ccRecipients"
createdDateTime = "createdDateTime"
displayName = "displayName"
emailAddresses = "emailAddresses"
givenName = "givenName"
mobilePhone = "mobilePhone"
parentFolderID = "parentFolderId"
receivedDateTime = "receivedDateTime"
sentDateTime = "sentDateTime"
surname = "surname"
toRecipients = "toRecipients"
userPrincipalName = "userPrincipalName"

View File

@ -317,7 +317,7 @@ func ContactInfo(contact models.Contactable) *details.ExchangeInfo {
}
func contactCollisionKeyProps() []string {
return idAnd(givenName)
return idAnd(givenName, surname, emailAddresses, mobilePhone)
}
// ContactCollisionKey constructs a key from the contactable's creation time and either displayName or given+surname.
@ -327,5 +327,17 @@ func ContactCollisionKey(item models.Contactable) string {
return ""
}
return ptr.Val(item.GetId())
var (
given = ptr.Val(item.GetGivenName())
sur = ptr.Val(item.GetSurname())
emails = item.GetEmailAddresses()
email string
phone = ptr.Val(item.GetMobilePhone())
)
for _, em := range emails {
email += ptr.Val(em.GetAddress())
}
return given + sur + email + phone
}

View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/tester"
@ -52,16 +53,26 @@ func (suite *ContactsPagerIntgSuite) TestContacts_GetItemsInContainerByCollision
require.NoError(t, err, clues.ToCore(err))
cs := conts.GetValue()
expect := make([]string, 0, len(cs))
expectM := map[string]struct{}{}
for _, c := range cs {
expect = append(expect, api.ContactCollisionKey(c))
expectM[api.ContactCollisionKey(c)] = struct{}{}
}
expect := maps.Keys(expectM)
results, err := suite.its.ac.Contacts().GetItemsInContainerByCollisionKey(ctx, suite.its.userID, "contacts")
require.NoError(t, err, clues.ToCore(err))
require.Less(t, 0, len(results), "requires at least one result")
for _, k := range expect {
t.Log("expects key", k)
}
for k := range results {
t.Log("results key", k)
}
for k, v := range results {
assert.NotEmpty(t, k, "all keys should be populated")
assert.NotEmpty(t, v, "all values should be populated")

View File

@ -54,6 +54,9 @@ func (suite *DrivePagerIntgSuite) TestDrives_GetItemsInContainerByCollisionKey()
ctx, flush := tester.NewContext(t)
defer flush()
t.Log("drive", test.driveID)
t.Log("rootFolder", test.rootFolderID)
items, err := suite.its.ac.Stable.
Client().
Drives().
@ -73,10 +76,20 @@ func (suite *DrivePagerIntgSuite) TestDrives_GetItemsInContainerByCollisionKey()
"need at least one item to compare in user %s drive %s folder %s",
suite.its.userID, test.driveID, test.rootFolderID)
results, err := suite.its.ac.Drives().GetItemsInContainerByCollisionKey(ctx, test.driveID, test.rootFolderID)
results, err := suite.its.ac.
Drives().
GetItemsInContainerByCollisionKey(ctx, test.driveID, test.rootFolderID)
require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, results)
for _, k := range expect {
t.Log("expects key", k)
}
for k := range results {
t.Log("results key", k)
}
for k, v := range results {
assert.NotEmpty(t, k, "all keys should be populated")
assert.NotEmpty(t, v, "all values should be populated")

View File

@ -701,7 +701,7 @@ func EventFromMap(ev map[string]any) (models.Eventable, error) {
}
func eventCollisionKeyProps() []string {
return idAnd("subject")
return idAnd("subject", "type", "start", "end", "attendees", "recurrence")
}
// EventCollisionKey constructs a key from the eventable's creation time, subject, and organizer.
@ -711,5 +711,39 @@ func EventCollisionKey(item models.Eventable) string {
return ""
}
return ptr.Val(item.GetSubject())
var (
subject = ptr.Val(item.GetSubject())
attendees = item.GetAttendees()
a string
oftype = ptr.Val(item.GetType())
t = oftype.String()
start = item.GetStart()
s string
end = item.GetEnd()
e string
recurs = item.GetRecurrence()
r string
)
for _, att := range attendees {
if att.GetEmailAddress() != nil {
a += ptr.Val(att.GetEmailAddress().GetAddress())
}
}
if start != nil {
s = ptr.Val(start.GetDateTime())
}
if end != nil {
e = ptr.Val(end.GetDateTime())
}
if recurs != nil && recurs.GetPattern() != nil {
r = ptr.Val(recurs.GetPattern().GetOdataType())
}
// this result gets hashed to ensure that an enormous list of attendees
// doesn't generate a multi-kb collision key.
return clues.ConcealWith(clues.SHA256, subject+a+t+s+e+r)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/tester"
@ -52,13 +53,17 @@ func (suite *EventsPagerIntgSuite) TestEvents_GetItemsInContainerByCollisionKey(
require.NoError(t, err, clues.ToCore(err))
es := evts.GetValue()
expect := make([]string, 0, len(es))
expectM := map[string]struct{}{}
for _, e := range es {
expect = append(expect, api.EventCollisionKey(e))
expectM[api.EventCollisionKey(e)] = struct{}{}
}
results, err := suite.its.ac.Events().GetItemsInContainerByCollisionKey(ctx, suite.its.userID, "calendar")
expect := maps.Keys(expectM)
results, err := suite.its.ac.
Events().
GetItemsInContainerByCollisionKey(ctx, suite.its.userID, "calendar")
require.NoError(t, err, clues.ToCore(err))
require.Less(t, 0, len(results), "requires at least one result")
@ -67,6 +72,14 @@ func (suite *EventsPagerIntgSuite) TestEvents_GetItemsInContainerByCollisionKey(
assert.NotEmpty(t, v, "all values should be populated")
}
for _, k := range expect {
t.Log("expects key", k)
}
for k := range results {
t.Log("results key", k)
}
for _, e := range expect {
_, ok := results[e]
assert.Truef(t, ok, "expected results to contain collision key: %s", e)

View File

@ -12,6 +12,7 @@ import (
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/users"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
@ -611,7 +612,7 @@ func UnwrapEmailAddress(contact models.Recipientable) string {
}
func mailCollisionKeyProps() []string {
return idAnd("subject")
return idAnd("subject", sentDateTime, receivedDateTime)
}
// MailCollisionKey constructs a key from the messageable's subject, sender, and recipients (to, cc, bcc).
@ -621,5 +622,11 @@ func MailCollisionKey(item models.Messageable) string {
return ""
}
return ptr.Val(item.GetSubject())
var (
subject = ptr.Val(item.GetSubject())
sent = ptr.Val(item.GetSentDateTime())
received = ptr.Val(item.GetReceivedDateTime())
)
return subject + dttm.Format(sent) + dttm.Format(received)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/tester"
@ -52,16 +53,26 @@ func (suite *MailPagerIntgSuite) TestMail_GetItemsInContainerByCollisionKey() {
require.NoError(t, err, clues.ToCore(err))
ms := msgs.GetValue()
expect := make([]string, 0, len(ms))
expectM := map[string]struct{}{}
for _, m := range ms {
expect = append(expect, api.MailCollisionKey(m))
expectM[api.MailCollisionKey(m)] = struct{}{}
}
expect := maps.Keys(expectM)
results, err := suite.its.ac.Mail().GetItemsInContainerByCollisionKey(ctx, suite.its.userID, "inbox")
require.NoError(t, err, clues.ToCore(err))
require.Less(t, 0, len(results), "requires at least one result")
for _, k := range expect {
t.Log("expects key", k)
}
for k := range results {
t.Log("results key", k)
}
for k, v := range results {
assert.NotEmpty(t, k, "all keys should be populated")
assert.NotEmpty(t, v, "all values should be populated")