From 784db9e12e0df2df95f17aea6cbc89708445b1a9 Mon Sep 17 00:00:00 2001 From: Abin Simon Date: Thu, 1 Dec 2022 10:16:24 +0530 Subject: [PATCH] Speed up Exchange backup by parallelizing url fetch (#1608) ## Description From rough numbers we can speedup an account with ~3000 emails, ~1000 contacts and ~1000 events from ~18m to <3m. ## Type of change - [ ] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :hamster: Trivial/Minor ## Issue(s) * https://github.com/alcionai/corso/issues/362 * https://github.com/alcionai/corso/issues/1595 * https://github.com/alcionai/corso/pull/1607 ## Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [x] :green_heart: E2E --- .../exchange/exchange_data_collection.go | 94 +++++++++++++------ 1 file changed, 64 insertions(+), 30 deletions(-) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index f960ab2a5..a99b7cd18 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -8,6 +8,9 @@ import ( "context" "fmt" "io" + "sync" + "sync/atomic" + "time" absser "github.com/microsoft/kiota-abstractions-go/serialization" kw "github.com/microsoft/kiota-serialization-json-go" @@ -33,6 +36,10 @@ var ( const ( collectionChannelBufferSize = 1000 numberOfRetries = 4 + + // Outlooks expects max 4 concurrent requests + // https://learn.microsoft.com/en-us/graph/throttling-limits#outlook-service-limits + urlPrefetchChannelBufferSize = 4 ) // Collection implements the interface from data.Collection @@ -115,11 +122,11 @@ func (col *Collection) populateByOptionIdentifier( ) { var ( errs error - success int + success int64 totalBytes int64 + wg sync.WaitGroup - user = col.user - objectWriter = kw.NewJsonSerializationWriter() + user = col.user ) colProgress, closer := observe.CollectionProgress(user, col.fullPath.Category().String(), col.fullPath.Folder()) @@ -127,7 +134,7 @@ func (col *Collection) populateByOptionIdentifier( defer func() { close(colProgress) - col.finishPopulation(ctx, success, totalBytes, errs) + col.finishPopulation(ctx, int(success), totalBytes, errs) }() // get QueryBasedonIdentifier @@ -139,34 +146,61 @@ func (col *Collection) populateByOptionIdentifier( return } - for _, identifier := range col.jobs { - response, err := query(ctx, col.service, user, identifier) - if err != nil { - errs = support.WrapAndAppendf(user, err, errs) + // Limit the max number of active requests to GC + semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) + defer close(semaphoreCh) - if col.service.ErrPolicy() { - break - } - - continue - } - - byteCount, err := serializeFunc(ctx, col.service.Client(), objectWriter, col.data, response, user) - if err != nil { - errs = support.WrapAndAppendf(user, err, errs) - - if col.service.ErrPolicy() { - break - } - - continue - } - - success++ - - totalBytes += int64(byteCount) - colProgress <- struct{}{} + errUpdater := func(user string, err error) { + errs = support.WrapAndAppend(user, err, errs) } + + for _, identifier := range col.jobs { + if col.service.ErrPolicy() && errs != nil { + break + } + semaphoreCh <- struct{}{} + + wg.Add(1) + + go func(identifier string) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + var ( + response absser.Parsable + err error + ) + + for i := 1; i <= numberOfRetries; i++ { + response, err = query(ctx, col.service, user, identifier) + if err == nil { + break + } + // TODO: Tweak sleep times + if i < numberOfRetries { + time.Sleep(time.Duration(3*(i+1)) * time.Second) + } + } + + if err != nil { + errUpdater(user, err) + return + } + + byteCount, err := serializeFunc(ctx, col.service.Client(), kw.NewJsonSerializationWriter(), col.data, response, user) + if err != nil { + errUpdater(user, err) + return + } + + atomic.AddInt64(&success, 1) + atomic.AddInt64(&totalBytes, int64(byteCount)) + + colProgress <- struct{}{} + }(identifier) + } + + wg.Wait() } // terminatePopulateSequence is a utility function used to close a Collection's data channel