Speed up Exchange backup by parallelizing url fetch (#1608)

## Description

From rough numbers we can speedup an account with ~3000 emails, ~1000 contacts and ~1000 events from ~18m to <3m.

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* https://github.com/alcionai/corso/issues/362
* https://github.com/alcionai/corso/issues/1595
* https://github.com/alcionai/corso/pull/1607

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Abin Simon 2022-12-01 10:16:24 +05:30 committed by GitHub
parent 58e70166d4
commit 784db9e12e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -8,6 +8,9 @@ import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
absser "github.com/microsoft/kiota-abstractions-go/serialization"
kw "github.com/microsoft/kiota-serialization-json-go"
@ -33,6 +36,10 @@ var (
const (
collectionChannelBufferSize = 1000
numberOfRetries = 4
// Outlooks expects max 4 concurrent requests
// https://learn.microsoft.com/en-us/graph/throttling-limits#outlook-service-limits
urlPrefetchChannelBufferSize = 4
)
// Collection implements the interface from data.Collection
@ -115,11 +122,11 @@ func (col *Collection) populateByOptionIdentifier(
) {
var (
errs error
success int
success int64
totalBytes int64
wg sync.WaitGroup
user = col.user
objectWriter = kw.NewJsonSerializationWriter()
user = col.user
)
colProgress, closer := observe.CollectionProgress(user, col.fullPath.Category().String(), col.fullPath.Folder())
@ -127,7 +134,7 @@ func (col *Collection) populateByOptionIdentifier(
defer func() {
close(colProgress)
col.finishPopulation(ctx, success, totalBytes, errs)
col.finishPopulation(ctx, int(success), totalBytes, errs)
}()
// get QueryBasedonIdentifier
@ -139,34 +146,61 @@ func (col *Collection) populateByOptionIdentifier(
return
}
for _, identifier := range col.jobs {
response, err := query(ctx, col.service, user, identifier)
if err != nil {
errs = support.WrapAndAppendf(user, err, errs)
// Limit the max number of active requests to GC
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
defer close(semaphoreCh)
if col.service.ErrPolicy() {
break
}
continue
}
byteCount, err := serializeFunc(ctx, col.service.Client(), objectWriter, col.data, response, user)
if err != nil {
errs = support.WrapAndAppendf(user, err, errs)
if col.service.ErrPolicy() {
break
}
continue
}
success++
totalBytes += int64(byteCount)
colProgress <- struct{}{}
errUpdater := func(user string, err error) {
errs = support.WrapAndAppend(user, err, errs)
}
for _, identifier := range col.jobs {
if col.service.ErrPolicy() && errs != nil {
break
}
semaphoreCh <- struct{}{}
wg.Add(1)
go func(identifier string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
var (
response absser.Parsable
err error
)
for i := 1; i <= numberOfRetries; i++ {
response, err = query(ctx, col.service, user, identifier)
if err == nil {
break
}
// TODO: Tweak sleep times
if i < numberOfRetries {
time.Sleep(time.Duration(3*(i+1)) * time.Second)
}
}
if err != nil {
errUpdater(user, err)
return
}
byteCount, err := serializeFunc(ctx, col.service.Client(), kw.NewJsonSerializationWriter(), col.data, response, user)
if err != nil {
errUpdater(user, err)
return
}
atomic.AddInt64(&success, 1)
atomic.AddInt64(&totalBytes, int64(byteCount))
colProgress <- struct{}{}
}(identifier)
}
wg.Wait()
}
// terminatePopulateSequence is a utility function used to close a Collection's data channel