diff --git a/.gitignore b/.gitignore index 3fabb09e9..f3bb16eb2 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,5 @@ /bin /docker/bin /website/dist + +*/test_results/** \ No newline at end of file diff --git a/src/internal/connector/exchange/service_restore.go b/src/internal/connector/exchange/service_restore.go index de8530bb6..b9f4957c6 100644 --- a/src/internal/connector/exchange/service_restore.go +++ b/src/internal/connector/exchange/service_restore.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "runtime/trace" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/pkg/errors" @@ -258,74 +259,26 @@ func RestoreExchangeDataCollections( ) (*support.ConnectorOperationStatus, error) { var ( pathCounter = map[string]bool{} + rootFolder string attempts, successes int errs error - folderID, root string - isCancelled bool // TODO policy to be updated from external source after completion of refactoring policy = control.Copy ) + errUpdater := func(id string, err error) { + errs = support.WrapAndAppend(id, err, errs) + } + for _, dc := range dcs { - var ( - items = dc.Items() - directory = dc.FullPath() - service = directory.Service() - category = directory.Category() - user = directory.ResourceOwner() - exit bool - directoryCheckFunc = generateRestoreContainerFunc(gs, user, category, dest.ContainerName) - ) + a, s, root, canceled := restoreCollection(ctx, gs, dc, rootFolder, pathCounter, dest, policy, errUpdater) + attempts += a + successes += s + rootFolder = root - folderID, root, errs = directoryCheckFunc(ctx, errs, directory.String(), root, pathCounter) - if errs != nil { // assuming FailFast + if canceled { break } - - if isCancelled { - break - } - - for !exit { - select { - case <-ctx.Done(): - errs = support.WrapAndAppend("context cancelled", ctx.Err(), errs) - isCancelled = true - - case itemData, ok := <-items: - if !ok { - exit = true - continue - } - attempts++ - - buf := &bytes.Buffer{} - - _, err := buf.ReadFrom(itemData.ToReader()) - if err != nil { - errs = support.WrapAndAppend( - itemData.UUID()+": byteReadError during RestoreDataCollection", - err, - errs, - ) - - continue - } - - err = RestoreExchangeObject(ctx, buf.Bytes(), category, policy, gs, folderID, user) - if err != nil { - // More information to be here - errs = support.WrapAndAppend( - itemData.UUID()+": failed to upload RestoreExchangeObject: "+service.String()+"-"+category.String(), - err, - errs, - ) - - continue - } - successes++ - } - } } status := support.CreateStatus(ctx, support.Restore, attempts, successes, len(pathCounter), errs) @@ -333,6 +286,74 @@ func RestoreExchangeDataCollections( return status, errs } +// restoreCollection handles restoration of an individual collection. +func restoreCollection( + ctx context.Context, + gs graph.Service, + dc data.Collection, + rootFolder string, + pathCounter map[string]bool, + dest control.RestoreDestination, + policy control.CollisionPolicy, + errUpdater func(string, error), +) (int, int, string, bool) { + defer trace.StartRegion(ctx, "gc:exchange:restoreCollection").End() + trace.Log(ctx, "gc:exchange:restoreCollection", dc.FullPath().String()) + + var ( + attempts, successes int + folderID string + err error + items = dc.Items() + directory = dc.FullPath() + service = directory.Service() + category = directory.Category() + user = directory.ResourceOwner() + directoryCheckFunc = generateRestoreContainerFunc(gs, user, category, dest.ContainerName) + ) + + folderID, root, err := directoryCheckFunc(ctx, err, directory.String(), rootFolder, pathCounter) + if err != nil { // assuming FailFast + errUpdater(directory.String(), err) + return 0, 0, rootFolder, false + } + + for { + select { + case <-ctx.Done(): + errUpdater("context cancelled", ctx.Err()) + return attempts, successes, root, true + + case itemData, ok := <-items: + if !ok { + return attempts, successes, root, false + } + attempts++ + + trace.Log(ctx, "gc:exchange:restoreCollection:item", itemData.UUID()) + + buf := &bytes.Buffer{} + + _, err := buf.ReadFrom(itemData.ToReader()) + if err != nil { + errUpdater(itemData.UUID()+": byteReadError during RestoreDataCollection", err) + continue + } + + err = RestoreExchangeObject(ctx, buf.Bytes(), category, policy, gs, folderID, user) + if err != nil { + // More information to be here + errUpdater( + itemData.UUID()+": failed to upload RestoreExchangeObject: "+service.String()+"-"+category.String(), + err) + + continue + } + successes++ + } + } +} + // generateRestoreContainerFunc utility function that holds logic for creating // Root Directory or necessary functions based on path.CategoryType func generateRestoreContainerFunc( diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index e736af0aa..b3f405e29 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -5,6 +5,7 @@ package connector import ( "context" "fmt" + "runtime/trace" "sync" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" @@ -33,7 +34,9 @@ type GraphConnector struct { credentials account.M365Config // wg is used to track completion of GC tasks - wg *sync.WaitGroup + wg *sync.WaitGroup + region *trace.Region + // mutex used to synchronize updates to `status` mu sync.Mutex status support.ConnectorOperationStatus // contains the status of the last run status @@ -120,6 +123,8 @@ func (gs *graphService) EnableFailFast() { // workspace. The users field is updated during this method // iff the return value is true func (gc *GraphConnector) setTenantUsers(ctx context.Context) error { + defer trace.StartRegion(ctx, "gc:setTenantUsers").End() + response, err := exchange.GetAllUsersForTenant(ctx, gc.graphService, "") if err != nil { return errors.Wrapf( @@ -248,6 +253,8 @@ func (gc *GraphConnector) RestoreDataCollections( dest control.RestoreDestination, dcs []data.Collection, ) error { + gc.region = trace.StartRegion(ctx, "connector:restore") + var ( status *support.ConnectorOperationStatus err error @@ -340,7 +347,13 @@ func (gc *GraphConnector) createCollections( // AwaitStatus waits for all gc tasks to complete and then returns status func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus { + defer func() { + if gc.region != nil { + gc.region.End() + } + }() gc.wg.Wait() + return &gc.status } @@ -384,6 +397,8 @@ func IsNonRecoverableError(e error) bool { } func (gc *GraphConnector) DataCollections(ctx context.Context, sels selectors.Selector) ([]data.Collection, error) { + defer trace.StartRegion(ctx, "gc:dataCollections:"+sels.Service.String()).End() + switch sels.Service { case selectors.ServiceExchange: return gc.ExchangeDataCollection(ctx, sels) diff --git a/src/internal/connector/onedrive/restore.go b/src/internal/connector/onedrive/restore.go index 0619964c7..058a811ff 100644 --- a/src/internal/connector/onedrive/restore.go +++ b/src/internal/connector/onedrive/restore.go @@ -3,6 +3,7 @@ package onedrive import ( "context" "io" + "runtime/trace" "github.com/pkg/errors" @@ -51,69 +52,93 @@ func RestoreCollections( dcs []data.Collection, ) (*support.ConnectorOperationStatus, error) { var ( - total, restored int - restoreErrors error - copyBuffer = make([]byte, copyBufferSize) - restoreContainerName = dest.ContainerName + total, restored int + restoreErrors error ) + errUpdater := func(id string, err error) { + restoreErrors = support.WrapAndAppend(id, err, restoreErrors) + } + // Iterate through the data collections and restore the contents of each for _, dc := range dcs { - directory := dc.FullPath() + t, r, canceled := restoreCollection(ctx, service, dc, dest.ContainerName, errUpdater) + total += t + restored += r - drivePath, err := toOneDrivePath(directory) - if err != nil { - restoreErrors = support.WrapAndAppend(directory.String(), err, restoreErrors) - continue - } - - // Assemble folder hierarchy we're going to restore into (we recreate the folder hierarchy - // from the backup under this the restore folder instead of root) - // i.e. Restore into `/root://` - - restoreFolderElements := []string{restoreContainerName} - - restoreFolderElements = append(restoreFolderElements, drivePath.folders...) - - logger.Ctx(ctx).Debugf("Restore target for %s is %v", dc.FullPath(), restoreFolderElements) - - // Create restore folders and get the folder ID of the folder the data stream will be restored in - restoreFolderID, err := createRestoreFolders(ctx, service, drivePath.driveID, restoreFolderElements) - if err != nil { - restoreErrors = support.WrapAndAppend(directory.String(), errors.Wrapf(err, "failed to create folders %v", - restoreFolderElements), restoreErrors) - continue - } - - // Restore items from the collection - exit := false - items := dc.Items() - - for !exit { - select { - case <-ctx.Done(): - return nil, support.WrapAndAppend("context cancelled", ctx.Err(), restoreErrors) - case itemData, ok := <-items: - if !ok { - exit = true - break - } - total++ - - err := restoreItem(ctx, service, itemData, drivePath.driveID, restoreFolderID, copyBuffer) - if err != nil { - restoreErrors = support.WrapAndAppend(itemData.UUID(), err, restoreErrors) - continue - } - - restored++ - } + if canceled { + break } } return support.CreateStatus(ctx, support.Restore, total, restored, 0, restoreErrors), nil } +// restoreCollection handles restoration of an individual collection. +func restoreCollection( + ctx context.Context, + service graph.Service, + dc data.Collection, + restoreContainerName string, + errUpdater func(string, error), +) (int, int, bool) { + defer trace.StartRegion(ctx, "gc:oneDrive:restoreCollection").End() + + var ( + total, restored int + copyBuffer = make([]byte, copyBufferSize) + directory = dc.FullPath() + ) + + drivePath, err := toOneDrivePath(directory) + if err != nil { + errUpdater(directory.String(), err) + return 0, 0, false + } + + // Assemble folder hierarchy we're going to restore into (we recreate the folder hierarchy + // from the backup under this the restore folder instead of root) + // i.e. Restore into `/root://` + + restoreFolderElements := []string{restoreContainerName} + restoreFolderElements = append(restoreFolderElements, drivePath.folders...) + + trace.Log(ctx, "gc:oneDrive:restoreCollection", directory.String()) + logger.Ctx(ctx).Debugf("Restore target for %s is %v", dc.FullPath(), restoreFolderElements) + + // Create restore folders and get the folder ID of the folder the data stream will be restored in + restoreFolderID, err := createRestoreFolders(ctx, service, drivePath.driveID, restoreFolderElements) + if err != nil { + errUpdater(directory.String(), errors.Wrapf(err, "failed to create folders %v", restoreFolderElements)) + return 0, 0, false + } + + // Restore items from the collection + items := dc.Items() + + for { + select { + case <-ctx.Done(): + errUpdater("context canceled", ctx.Err()) + return total, restored, true + + case itemData, ok := <-items: + if !ok { + return total, restored, false + } + total++ + + err := restoreItem(ctx, service, itemData, drivePath.driveID, restoreFolderID, copyBuffer) + if err != nil { + errUpdater(itemData.UUID(), err) + continue + } + + restored++ + } + } +} + // createRestoreFolders creates the restore folder hieararchy in the specified drive and returns the folder ID // of the last folder entry in the hiearchy func createRestoreFolders(ctx context.Context, service graph.Service, driveID string, restoreFolders []string, @@ -163,7 +188,10 @@ func createRestoreFolders(ctx context.Context, service graph.Service, driveID st func restoreItem(ctx context.Context, service graph.Service, itemData data.Stream, driveID, parentFolderID string, copyBuffer []byte, ) error { + defer trace.StartRegion(ctx, "gc:oneDrive:restoreItem").End() + itemName := itemData.UUID() + trace.Log(ctx, "gc:oneDrive:restoreItem", itemName) // Get the stream size (needed to create the upload session) ss, ok := itemData.(data.StreamSize) diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 7bfec71fd..dd2aae9e9 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -2,6 +2,7 @@ package kopia import ( "context" + "runtime/trace" "sync" "sync/atomic" @@ -192,6 +193,8 @@ func getStreamItemFunc( progress *corsoProgress, ) func(context.Context, func(context.Context, fs.Entry) error) error { return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { + defer trace.StartRegion(ctx, "kopia:getStreamItemFunc").End() + // Collect all errors and return them at the end so that iteration for this // directory doesn't end early. var errs *multierror.Error @@ -230,6 +233,8 @@ func getStreamItemFunc( continue } + trace.Log(ctx, "kopia:getStreamItemFunc:item", itemPath.String()) + ei, ok := e.(data.StreamInfo) if !ok { errs = multierror.Append( @@ -383,6 +388,8 @@ func (w Wrapper) BackupCollections( return nil, nil, errNotConnected } + defer trace.StartRegion(ctx, "kopia:backupCollections").End() + progress := &corsoProgress{ pending: map[string]*itemDetails{}, deets: &details.Details{}, @@ -556,6 +563,8 @@ func (w Wrapper) RestoreMultipleItems( paths []path.Path, bcounter byteCounter, ) ([]data.Collection, error) { + defer trace.StartRegion(ctx, "kopia:restore:multiple").End() + if len(paths) == 0 { return nil, errors.WithStack(errNoRestorePath) } diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 4da66d6b7..636fb8ca9 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -2,6 +2,7 @@ package operations import ( "context" + "runtime/trace" "time" "github.com/google/uuid" @@ -83,6 +84,8 @@ type backupStats struct { // Run begins a synchronous backup operation. func (op *BackupOperation) Run(ctx context.Context) (err error) { + defer trace.StartRegion(ctx, "operations:backup:run").End() + var ( opStats backupStats backupDetails *details.Details diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index e8722487f..36d1f0c21 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -2,6 +2,7 @@ package operations import ( "context" + "runtime/trace" "time" "github.com/google/uuid" @@ -93,6 +94,8 @@ type restoreStats struct { // Run begins a synchronous restore operation. func (op *RestoreOperation) Run(ctx context.Context) (err error) { + defer trace.StartRegion(ctx, "operations:restore:run").End() + startTime := time.Now() // persist operation results to the model store on exit diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index dde0a9565..b65d3b41a 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -94,7 +94,7 @@ func Initialize( return nil, err } - r := repository{ + r := &repository{ ID: uuid.New(), Version: "v1", Account: acct, @@ -106,7 +106,7 @@ func Initialize( r.Bus.Event(ctx, events.RepoInit, nil) - return &r, nil + return r, nil } // Connect will: @@ -139,16 +139,14 @@ func Connect( } // todo: ID and CreatedAt should get retrieved from a stored kopia config. - r := repository{ + return &repository{ Version: "v1", Account: acct, Storage: s, Bus: events.NewBus(s, acct.ID(), opts), dataLayer: w, modelStore: ms, - } - - return &r, nil + }, nil } func (r *repository) Close(ctx context.Context) error { diff --git a/src/pkg/repository/repository_load_test.go b/src/pkg/repository/repository_load_test.go index 3fadaaf34..e0ed4ebab 100644 --- a/src/pkg/repository/repository_load_test.go +++ b/src/pkg/repository/repository_load_test.go @@ -2,6 +2,7 @@ package repository_test import ( "context" + "runtime/pprof" "testing" "github.com/stretchr/testify/assert" @@ -13,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/operations" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/logger" @@ -53,7 +55,16 @@ func runBackupLoadTest( ) { //revive:enable:context-as-argument t.Run("backup_"+name, func(t *testing.T) { - require.NoError(t, b.Run(ctx), "running backup") + var ( + err error + labels = pprof.Labels("backup_load_test", name) + ) + + pprof.Do(ctx, labels, func(ctx context.Context) { + err = b.Run(ctx) + }) + + require.NoError(t, err, "running backup") require.NotEmpty(t, b.Results, "has results after run") assert.NotEmpty(t, b.Results.BackupID, "has an ID after run") assert.Equal(t, b.Status, operations.Completed, "backup status") @@ -75,7 +86,16 @@ func runBackupListLoadTest( ) { //revive:enable:context-as-argument t.Run("backup_list_"+name, func(t *testing.T) { - bs, err := r.Backups(ctx) + var ( + err error + bs []backup.Backup + labels = pprof.Labels("list_load_test", name) + ) + + pprof.Do(ctx, labels, func(ctx context.Context) { + bs, err = r.Backups(ctx) + }) + require.NoError(t, err, "retrieving backups") require.Less(t, 0, len(bs), "at least one backup is recorded") @@ -105,7 +125,17 @@ func runBackupDetailsLoadTest( require.NotEmpty(t, backupID, "backup ID to retrieve deails") t.Run("backup_details_"+name, func(t *testing.T) { - ds, b, err := r.BackupDetails(ctx, backupID) + var ( + err error + b *backup.Backup + ds *details.Details + labels = pprof.Labels("details_load_test", name) + ) + + pprof.Do(ctx, labels, func(ctx context.Context) { + ds, b, err = r.BackupDetails(ctx, backupID) + }) + require.NoError(t, err, "retrieving details in backup "+backupID) require.NotNil(t, ds, "backup details must exist") require.NotNil(t, b, "backup must exist") @@ -134,8 +164,16 @@ func runRestoreLoadTest( ) { //revive:enable:context-as-argument t.Run("restore_"+name, func(t *testing.T) { - t.Skip("skipping restore handling while investigating performance") - require.NoError(t, r.Run(ctx), "running restore") + var ( + err error + labels = pprof.Labels("restore_load_test", name) + ) + + pprof.Do(ctx, labels, func(ctx context.Context) { + err = r.Run(ctx) + }) + + require.NoError(t, err, "running restore") require.NotEmpty(t, r.Results, "has results after run") assert.Equal(t, r.Status, operations.Completed, "restore status") assert.Less(t, 0, r.Results.ItemsRead, "items read") @@ -244,6 +282,7 @@ func TestRepositoryLoadTestOneDriveSuite(t *testing.T) { func (suite *RepositoryLoadTestOneDriveSuite) SetupSuite() { t := suite.T() + t.Skip("temp issue-902-live") t.Parallel() suite.ctx, suite.repo, suite.acct, suite.st = initM365Repo(t) } @@ -268,8 +307,6 @@ func (suite *RepositoryLoadTestOneDriveSuite) TestOneDrive() { service = "one_drive" ) - t.Skip("temp issue-902-live") - m356User := tester.M365UserID(t) // backup diff --git a/src/pkg/selectors/scopes.go b/src/pkg/selectors/scopes.go index ba8035941..554634411 100644 --- a/src/pkg/selectors/scopes.go +++ b/src/pkg/selectors/scopes.go @@ -2,6 +2,7 @@ package selectors import ( "context" + "runtime/trace" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/filters" @@ -210,6 +211,8 @@ func reduce[T scopeT, C categoryT]( s Selector, dataCategories map[path.CategoryType]C, ) *details.Details { + defer trace.StartRegion(ctx, "selectors:reduce").End() + if deets == nil { return nil }