add diagnostics to load_test (#983)

## Description

Adds some go runtime diagnostics tracking to
load testing, including some trace regioning.
Unfortunately, I couldn't find any third party trace library that didn't depend on a sidecar server
to sample against the application on.  Therefore,
just starting with something basic.

## Type of change

- [x] 🤖 Test

## Issue(s)

* #902

## Test Plan

- [x] 💪 Manual
- [x] 💚 E2E
This commit is contained in:
Keepers 2022-09-29 10:47:09 -06:00 committed by GitHub
parent b7f5ed73c3
commit d4390ac5ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 243 additions and 124 deletions

2
.gitignore vendored
View File

@ -23,3 +23,5 @@
/bin
/docker/bin
/website/dist
*/test_results/**

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"runtime/trace"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors"
@ -258,74 +259,26 @@ func RestoreExchangeDataCollections(
) (*support.ConnectorOperationStatus, error) {
var (
pathCounter = map[string]bool{}
rootFolder string
attempts, successes int
errs error
folderID, root string
isCancelled bool
// TODO policy to be updated from external source after completion of refactoring
policy = control.Copy
)
errUpdater := func(id string, err error) {
errs = support.WrapAndAppend(id, err, errs)
}
for _, dc := range dcs {
var (
items = dc.Items()
directory = dc.FullPath()
service = directory.Service()
category = directory.Category()
user = directory.ResourceOwner()
exit bool
directoryCheckFunc = generateRestoreContainerFunc(gs, user, category, dest.ContainerName)
)
a, s, root, canceled := restoreCollection(ctx, gs, dc, rootFolder, pathCounter, dest, policy, errUpdater)
attempts += a
successes += s
rootFolder = root
folderID, root, errs = directoryCheckFunc(ctx, errs, directory.String(), root, pathCounter)
if errs != nil { // assuming FailFast
if canceled {
break
}
if isCancelled {
break
}
for !exit {
select {
case <-ctx.Done():
errs = support.WrapAndAppend("context cancelled", ctx.Err(), errs)
isCancelled = true
case itemData, ok := <-items:
if !ok {
exit = true
continue
}
attempts++
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(itemData.ToReader())
if err != nil {
errs = support.WrapAndAppend(
itemData.UUID()+": byteReadError during RestoreDataCollection",
err,
errs,
)
continue
}
err = RestoreExchangeObject(ctx, buf.Bytes(), category, policy, gs, folderID, user)
if err != nil {
// More information to be here
errs = support.WrapAndAppend(
itemData.UUID()+": failed to upload RestoreExchangeObject: "+service.String()+"-"+category.String(),
err,
errs,
)
continue
}
successes++
}
}
}
status := support.CreateStatus(ctx, support.Restore, attempts, successes, len(pathCounter), errs)
@ -333,6 +286,74 @@ func RestoreExchangeDataCollections(
return status, errs
}
// restoreCollection handles restoration of an individual collection.
func restoreCollection(
ctx context.Context,
gs graph.Service,
dc data.Collection,
rootFolder string,
pathCounter map[string]bool,
dest control.RestoreDestination,
policy control.CollisionPolicy,
errUpdater func(string, error),
) (int, int, string, bool) {
defer trace.StartRegion(ctx, "gc:exchange:restoreCollection").End()
trace.Log(ctx, "gc:exchange:restoreCollection", dc.FullPath().String())
var (
attempts, successes int
folderID string
err error
items = dc.Items()
directory = dc.FullPath()
service = directory.Service()
category = directory.Category()
user = directory.ResourceOwner()
directoryCheckFunc = generateRestoreContainerFunc(gs, user, category, dest.ContainerName)
)
folderID, root, err := directoryCheckFunc(ctx, err, directory.String(), rootFolder, pathCounter)
if err != nil { // assuming FailFast
errUpdater(directory.String(), err)
return 0, 0, rootFolder, false
}
for {
select {
case <-ctx.Done():
errUpdater("context cancelled", ctx.Err())
return attempts, successes, root, true
case itemData, ok := <-items:
if !ok {
return attempts, successes, root, false
}
attempts++
trace.Log(ctx, "gc:exchange:restoreCollection:item", itemData.UUID())
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(itemData.ToReader())
if err != nil {
errUpdater(itemData.UUID()+": byteReadError during RestoreDataCollection", err)
continue
}
err = RestoreExchangeObject(ctx, buf.Bytes(), category, policy, gs, folderID, user)
if err != nil {
// More information to be here
errUpdater(
itemData.UUID()+": failed to upload RestoreExchangeObject: "+service.String()+"-"+category.String(),
err)
continue
}
successes++
}
}
}
// generateRestoreContainerFunc utility function that holds logic for creating
// Root Directory or necessary functions based on path.CategoryType
func generateRestoreContainerFunc(

View File

@ -5,6 +5,7 @@ package connector
import (
"context"
"fmt"
"runtime/trace"
"sync"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
@ -34,6 +35,8 @@ type GraphConnector struct {
// wg is used to track completion of GC tasks
wg *sync.WaitGroup
region *trace.Region
// mutex used to synchronize updates to `status`
mu sync.Mutex
status support.ConnectorOperationStatus // contains the status of the last run status
@ -120,6 +123,8 @@ func (gs *graphService) EnableFailFast() {
// workspace. The users field is updated during this method
// iff the return value is true
func (gc *GraphConnector) setTenantUsers(ctx context.Context) error {
defer trace.StartRegion(ctx, "gc:setTenantUsers").End()
response, err := exchange.GetAllUsersForTenant(ctx, gc.graphService, "")
if err != nil {
return errors.Wrapf(
@ -248,6 +253,8 @@ func (gc *GraphConnector) RestoreDataCollections(
dest control.RestoreDestination,
dcs []data.Collection,
) error {
gc.region = trace.StartRegion(ctx, "connector:restore")
var (
status *support.ConnectorOperationStatus
err error
@ -340,7 +347,13 @@ func (gc *GraphConnector) createCollections(
// AwaitStatus waits for all gc tasks to complete and then returns status
func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus {
defer func() {
if gc.region != nil {
gc.region.End()
}
}()
gc.wg.Wait()
return &gc.status
}
@ -384,6 +397,8 @@ func IsNonRecoverableError(e error) bool {
}
func (gc *GraphConnector) DataCollections(ctx context.Context, sels selectors.Selector) ([]data.Collection, error) {
defer trace.StartRegion(ctx, "gc:dataCollections:"+sels.Service.String()).End()
switch sels.Service {
case selectors.ServiceExchange:
return gc.ExchangeDataCollection(ctx, sels)

View File

@ -3,6 +3,7 @@ package onedrive
import (
"context"
"io"
"runtime/trace"
"github.com/pkg/errors"
@ -53,18 +54,46 @@ func RestoreCollections(
var (
total, restored int
restoreErrors error
copyBuffer = make([]byte, copyBufferSize)
restoreContainerName = dest.ContainerName
)
errUpdater := func(id string, err error) {
restoreErrors = support.WrapAndAppend(id, err, restoreErrors)
}
// Iterate through the data collections and restore the contents of each
for _, dc := range dcs {
directory := dc.FullPath()
t, r, canceled := restoreCollection(ctx, service, dc, dest.ContainerName, errUpdater)
total += t
restored += r
if canceled {
break
}
}
return support.CreateStatus(ctx, support.Restore, total, restored, 0, restoreErrors), nil
}
// restoreCollection handles restoration of an individual collection.
func restoreCollection(
ctx context.Context,
service graph.Service,
dc data.Collection,
restoreContainerName string,
errUpdater func(string, error),
) (int, int, bool) {
defer trace.StartRegion(ctx, "gc:oneDrive:restoreCollection").End()
var (
total, restored int
copyBuffer = make([]byte, copyBufferSize)
directory = dc.FullPath()
)
drivePath, err := toOneDrivePath(directory)
if err != nil {
restoreErrors = support.WrapAndAppend(directory.String(), err, restoreErrors)
continue
errUpdater(directory.String(), err)
return 0, 0, false
}
// Assemble folder hierarchy we're going to restore into (we recreate the folder hierarchy
@ -72,37 +101,36 @@ func RestoreCollections(
// i.e. Restore into `<drive>/root:/<restoreContainerName>/<original folder path>`
restoreFolderElements := []string{restoreContainerName}
restoreFolderElements = append(restoreFolderElements, drivePath.folders...)
trace.Log(ctx, "gc:oneDrive:restoreCollection", directory.String())
logger.Ctx(ctx).Debugf("Restore target for %s is %v", dc.FullPath(), restoreFolderElements)
// Create restore folders and get the folder ID of the folder the data stream will be restored in
restoreFolderID, err := createRestoreFolders(ctx, service, drivePath.driveID, restoreFolderElements)
if err != nil {
restoreErrors = support.WrapAndAppend(directory.String(), errors.Wrapf(err, "failed to create folders %v",
restoreFolderElements), restoreErrors)
continue
errUpdater(directory.String(), errors.Wrapf(err, "failed to create folders %v", restoreFolderElements))
return 0, 0, false
}
// Restore items from the collection
exit := false
items := dc.Items()
for !exit {
for {
select {
case <-ctx.Done():
return nil, support.WrapAndAppend("context cancelled", ctx.Err(), restoreErrors)
errUpdater("context canceled", ctx.Err())
return total, restored, true
case itemData, ok := <-items:
if !ok {
exit = true
break
return total, restored, false
}
total++
err := restoreItem(ctx, service, itemData, drivePath.driveID, restoreFolderID, copyBuffer)
if err != nil {
restoreErrors = support.WrapAndAppend(itemData.UUID(), err, restoreErrors)
errUpdater(itemData.UUID(), err)
continue
}
@ -111,9 +139,6 @@ func RestoreCollections(
}
}
return support.CreateStatus(ctx, support.Restore, total, restored, 0, restoreErrors), nil
}
// createRestoreFolders creates the restore folder hieararchy in the specified drive and returns the folder ID
// of the last folder entry in the hiearchy
func createRestoreFolders(ctx context.Context, service graph.Service, driveID string, restoreFolders []string,
@ -163,7 +188,10 @@ func createRestoreFolders(ctx context.Context, service graph.Service, driveID st
func restoreItem(ctx context.Context, service graph.Service, itemData data.Stream, driveID, parentFolderID string,
copyBuffer []byte,
) error {
defer trace.StartRegion(ctx, "gc:oneDrive:restoreItem").End()
itemName := itemData.UUID()
trace.Log(ctx, "gc:oneDrive:restoreItem", itemName)
// Get the stream size (needed to create the upload session)
ss, ok := itemData.(data.StreamSize)

View File

@ -2,6 +2,7 @@ package kopia
import (
"context"
"runtime/trace"
"sync"
"sync/atomic"
@ -192,6 +193,8 @@ func getStreamItemFunc(
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
defer trace.StartRegion(ctx, "kopia:getStreamItemFunc").End()
// Collect all errors and return them at the end so that iteration for this
// directory doesn't end early.
var errs *multierror.Error
@ -230,6 +233,8 @@ func getStreamItemFunc(
continue
}
trace.Log(ctx, "kopia:getStreamItemFunc:item", itemPath.String())
ei, ok := e.(data.StreamInfo)
if !ok {
errs = multierror.Append(
@ -383,6 +388,8 @@ func (w Wrapper) BackupCollections(
return nil, nil, errNotConnected
}
defer trace.StartRegion(ctx, "kopia:backupCollections").End()
progress := &corsoProgress{
pending: map[string]*itemDetails{},
deets: &details.Details{},
@ -556,6 +563,8 @@ func (w Wrapper) RestoreMultipleItems(
paths []path.Path,
bcounter byteCounter,
) ([]data.Collection, error) {
defer trace.StartRegion(ctx, "kopia:restore:multiple").End()
if len(paths) == 0 {
return nil, errors.WithStack(errNoRestorePath)
}

View File

@ -2,6 +2,7 @@ package operations
import (
"context"
"runtime/trace"
"time"
"github.com/google/uuid"
@ -83,6 +84,8 @@ type backupStats struct {
// Run begins a synchronous backup operation.
func (op *BackupOperation) Run(ctx context.Context) (err error) {
defer trace.StartRegion(ctx, "operations:backup:run").End()
var (
opStats backupStats
backupDetails *details.Details

View File

@ -2,6 +2,7 @@ package operations
import (
"context"
"runtime/trace"
"time"
"github.com/google/uuid"
@ -93,6 +94,8 @@ type restoreStats struct {
// Run begins a synchronous restore operation.
func (op *RestoreOperation) Run(ctx context.Context) (err error) {
defer trace.StartRegion(ctx, "operations:restore:run").End()
startTime := time.Now()
// persist operation results to the model store on exit

View File

@ -94,7 +94,7 @@ func Initialize(
return nil, err
}
r := repository{
r := &repository{
ID: uuid.New(),
Version: "v1",
Account: acct,
@ -106,7 +106,7 @@ func Initialize(
r.Bus.Event(ctx, events.RepoInit, nil)
return &r, nil
return r, nil
}
// Connect will:
@ -139,16 +139,14 @@ func Connect(
}
// todo: ID and CreatedAt should get retrieved from a stored kopia config.
r := repository{
return &repository{
Version: "v1",
Account: acct,
Storage: s,
Bus: events.NewBus(s, acct.ID(), opts),
dataLayer: w,
modelStore: ms,
}
return &r, nil
}, nil
}
func (r *repository) Close(ctx context.Context) error {

View File

@ -2,6 +2,7 @@ package repository_test
import (
"context"
"runtime/pprof"
"testing"
"github.com/stretchr/testify/assert"
@ -13,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/operations"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/logger"
@ -53,7 +55,16 @@ func runBackupLoadTest(
) {
//revive:enable:context-as-argument
t.Run("backup_"+name, func(t *testing.T) {
require.NoError(t, b.Run(ctx), "running backup")
var (
err error
labels = pprof.Labels("backup_load_test", name)
)
pprof.Do(ctx, labels, func(ctx context.Context) {
err = b.Run(ctx)
})
require.NoError(t, err, "running backup")
require.NotEmpty(t, b.Results, "has results after run")
assert.NotEmpty(t, b.Results.BackupID, "has an ID after run")
assert.Equal(t, b.Status, operations.Completed, "backup status")
@ -75,7 +86,16 @@ func runBackupListLoadTest(
) {
//revive:enable:context-as-argument
t.Run("backup_list_"+name, func(t *testing.T) {
bs, err := r.Backups(ctx)
var (
err error
bs []backup.Backup
labels = pprof.Labels("list_load_test", name)
)
pprof.Do(ctx, labels, func(ctx context.Context) {
bs, err = r.Backups(ctx)
})
require.NoError(t, err, "retrieving backups")
require.Less(t, 0, len(bs), "at least one backup is recorded")
@ -105,7 +125,17 @@ func runBackupDetailsLoadTest(
require.NotEmpty(t, backupID, "backup ID to retrieve deails")
t.Run("backup_details_"+name, func(t *testing.T) {
ds, b, err := r.BackupDetails(ctx, backupID)
var (
err error
b *backup.Backup
ds *details.Details
labels = pprof.Labels("details_load_test", name)
)
pprof.Do(ctx, labels, func(ctx context.Context) {
ds, b, err = r.BackupDetails(ctx, backupID)
})
require.NoError(t, err, "retrieving details in backup "+backupID)
require.NotNil(t, ds, "backup details must exist")
require.NotNil(t, b, "backup must exist")
@ -134,8 +164,16 @@ func runRestoreLoadTest(
) {
//revive:enable:context-as-argument
t.Run("restore_"+name, func(t *testing.T) {
t.Skip("skipping restore handling while investigating performance")
require.NoError(t, r.Run(ctx), "running restore")
var (
err error
labels = pprof.Labels("restore_load_test", name)
)
pprof.Do(ctx, labels, func(ctx context.Context) {
err = r.Run(ctx)
})
require.NoError(t, err, "running restore")
require.NotEmpty(t, r.Results, "has results after run")
assert.Equal(t, r.Status, operations.Completed, "restore status")
assert.Less(t, 0, r.Results.ItemsRead, "items read")
@ -244,6 +282,7 @@ func TestRepositoryLoadTestOneDriveSuite(t *testing.T) {
func (suite *RepositoryLoadTestOneDriveSuite) SetupSuite() {
t := suite.T()
t.Skip("temp issue-902-live")
t.Parallel()
suite.ctx, suite.repo, suite.acct, suite.st = initM365Repo(t)
}
@ -268,8 +307,6 @@ func (suite *RepositoryLoadTestOneDriveSuite) TestOneDrive() {
service = "one_drive"
)
t.Skip("temp issue-902-live")
m356User := tester.M365UserID(t)
// backup

View File

@ -2,6 +2,7 @@ package selectors
import (
"context"
"runtime/trace"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/filters"
@ -210,6 +211,8 @@ func reduce[T scopeT, C categoryT](
s Selector,
dataCategories map[path.CategoryType]C,
) *details.Details {
defer trace.StartRegion(ctx, "selectors:reduce").End()
if deets == nil {
return nil
}