add global throttling fence to graph middleware (#3432)

Adds a graph middleware that blocks all calls based on the most recent retry-after header in a 429 response.  This prevents us from failing on 429 requests due to external pressures even while the corso client believes it is within quota on rate limiting and concurrency.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🤖 Reliability/Supportability/Tests

#### Issue(s)

* #3407

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-05-18 12:21:57 -06:00 committed by GitHub
parent 489ab6f1c0
commit cc35b1ed97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 350 additions and 5 deletions

View File

@ -3,7 +3,9 @@ package graph
import (
"context"
"net/http"
"strconv"
"sync"
"time"
"github.com/alcionai/clues"
khttp "github.com/microsoft/kiota-http-go"
@ -200,3 +202,144 @@ func (mw *RateLimiterMiddleware) Intercept(
QueueRequest(req.Context())
return pipeline.Next(req, middlewareIndex)
}
// ---------------------------------------------------------------------------
// global throttle fencing
// ---------------------------------------------------------------------------
// timedFence sets up a fence for a certain amount of time.
// the time can be extended arbitrarily. All processes blocked at
// the fence will be let through when all timer extensions conclude.
type timedFence struct {
mu sync.Mutex
c chan struct{}
timers map[int64]*time.Timer
}
func newTimedFence() *timedFence {
return &timedFence{
mu: sync.Mutex{},
c: nil,
timers: map[int64]*time.Timer{},
}
}
// Block until the fence is let down.
// if no fence is up, return immediately.
// returns if the ctx deadlines before the fence is let down.
func (tf *timedFence) Block(ctx context.Context) error {
// set to a local var to avoid race panics from tf.c
// getting set to nil between the conditional check and
// the read case. If c gets closed between those two
// points then the select case will exit immediately,
// as if we didn't block at all.
c := tf.c
if c != nil {
select {
case <-ctx.Done():
return clues.Wrap(ctx.Err(), "blocked on throttling fence")
case <-c:
}
}
return nil
}
// RaiseFence puts up a fence to block requests for the provided
// duration of time. Seconds are always added to the current time.
// Multiple calls to RaiseFence are not additive. ie: calling
// `RaiseFence(5); RaiseFence(1)` will keep the fence up until
// now+5 seconds, not now+6 seconds. When the last remaining fence
// is dropped, all currently blocked calls are allowed through.
func (tf *timedFence) RaiseFence(seconds time.Duration) {
tf.mu.Lock()
defer tf.mu.Unlock()
if seconds < 1 {
return
}
if tf.c == nil {
tf.c = make(chan struct{})
}
timer := time.NewTimer(seconds)
tid := time.Now().Add(seconds).UnixMilli()
tf.timers[tid] = timer
go func(c <-chan time.Time, id int64) {
// wait for the timeout
<-c
tf.mu.Lock()
defer tf.mu.Unlock()
// remove the timer
delete(tf.timers, id)
// if no timers remain, close the channel to drop the fence
// and set the fenc channel to nil
if len(tf.timers) == 0 && tf.c != nil {
close(tf.c)
tf.c = nil
}
}(timer.C, tid)
}
// throttlingMiddleware is used to ensure we don't overstep per-min request limits.
type throttlingMiddleware struct {
tf *timedFence
}
func (mw *throttlingMiddleware) Intercept(
pipeline khttp.Pipeline,
middlewareIndex int,
req *http.Request,
) (*http.Response, error) {
err := mw.tf.Block(req.Context())
if err != nil {
return nil, err
}
resp, err := pipeline.Next(req, middlewareIndex)
if resp == nil || err != nil {
return resp, err
}
seconds := getRetryAfterHeader(resp)
if seconds < 1 {
return resp, nil
}
// if all prior conditions pass, we need to add a fence that blocks
// calls, globally, from progressing until the timeout retry-after
// passes.
mw.tf.RaiseFence(time.Duration(seconds) * time.Second)
return resp, nil
}
func getRetryAfterHeader(resp *http.Response) int {
if resp == nil || len(resp.Header) == 0 {
return -1
}
if resp.StatusCode != http.StatusTooManyRequests {
return -1
}
rah := resp.Header.Get(retryAfterHeader)
if len(rah) == 0 {
return -1
}
seconds, err := strconv.Atoi(rah)
if err != nil {
// the error itself is irrelevant, we only want
// to wait if we have a clear length of time to wait until.
return -1
}
return seconds
}

View File

@ -8,23 +8,25 @@ import (
"testing"
"time"
"github.com/alcionai/clues"
khttp "github.com/microsoft/kiota-http-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/net/context"
"github.com/alcionai/corso/src/internal/tester"
)
type ConcurrencyLimiterUnitTestSuite struct {
type ConcurrencyMWUnitTestSuite struct {
tester.Suite
}
func TestConcurrencyLimiterSuite(t *testing.T) {
suite.Run(t, &ConcurrencyLimiterUnitTestSuite{Suite: tester.NewUnitSuite(t)})
suite.Run(t, &ConcurrencyMWUnitTestSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *ConcurrencyLimiterUnitTestSuite) TestConcurrencyLimiter() {
func (suite *ConcurrencyMWUnitTestSuite) TestConcurrencyLimiter() {
t := suite.T()
maxConcurrentRequests := 4
@ -71,7 +73,7 @@ func (suite *ConcurrencyLimiterUnitTestSuite) TestConcurrencyLimiter() {
wg.Wait()
}
func (suite *ConcurrencyLimiterUnitTestSuite) TestInitializeConcurrencyLimiter() {
func (suite *ConcurrencyMWUnitTestSuite) TestInitializeConcurrencyLimiter() {
t := suite.T()
InitializeConcurrencyLimiter(2)
@ -80,7 +82,7 @@ func (suite *ConcurrencyLimiterUnitTestSuite) TestInitializeConcurrencyLimiter()
assert.Equal(t, cap(concurrencyLim.semaphore), 2, "singleton semaphore capacity changed")
}
func (suite *ConcurrencyLimiterUnitTestSuite) TestGenerateConcurrencyLimiter() {
func (suite *ConcurrencyMWUnitTestSuite) TestGenerateConcurrencyLimiter() {
tests := []struct {
name string
cap int
@ -118,3 +120,201 @@ func (suite *ConcurrencyLimiterUnitTestSuite) TestGenerateConcurrencyLimiter() {
})
}
}
func (suite *ConcurrencyMWUnitTestSuite) TestTimedFence_Block() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
tf := newTimedFence()
// raise multiple fences, the longest at 5 seconds
for i := -5; i < 6; i++ {
tf.RaiseFence(time.Duration(i) * time.Second)
}
// -5..0 get dropped, 1..5 get added
assert.Len(t, tf.timers, 5)
start := time.Now()
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func(i int) {
defer wg.Done()
err := tf.Block(ctx)
require.NoError(t, err, clues.ToCore(err))
}(i)
}
wg.Wait()
// should block for 5 seconds. comparing to 4 to avoid
// race condition flakes.
assert.Less(t, 4.0, time.Since(start).Seconds())
}
func (suite *ConcurrencyMWUnitTestSuite) TestTimedFence_Block_ctxDeadline() {
ctx, flush := tester.NewContext()
defer flush()
ctx, _ = context.WithDeadline(ctx, time.Now().Add(2*time.Second))
t := suite.T()
tf := newTimedFence()
// raise multiple fences, the longest at 10 seconds
for i := 1; i < 6; i++ {
tf.RaiseFence(time.Duration(i*2) * time.Second)
}
start := time.Now()
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func(i int) {
defer wg.Done()
err := tf.Block(ctx)
// should error from ctx deadline
require.Error(t, err, clues.ToCore(err))
}(i)
}
wg.Wait()
// should block for 2 seconds. comparing to 3 to avoid
// race condition flakes.
assert.Greater(t, 3.0, time.Since(start).Seconds())
}
type mockPipeline struct {
resp *http.Response
err error
}
func (mp mockPipeline) Next(*http.Request, int) (*http.Response, error) {
return mp.resp, mp.err
}
func (suite *ConcurrencyMWUnitTestSuite) TestThrottlingMiddleware() {
retryAfterNan := http.Header{}
retryAfterNan.Set(retryAfterHeader, "brunhuldi")
retryAfterNeg1 := http.Header{}
retryAfterNeg1.Set(retryAfterHeader, "-1")
retryAfter0 := http.Header{}
retryAfter0.Set(retryAfterHeader, "0")
retryAfter5 := http.Header{}
retryAfter5.Set(retryAfterHeader, "5")
goodPipe := mockPipeline{
resp: &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{},
},
}
table := []struct {
name string
pipeline mockPipeline
expectMinWait float64
}{
{
name: "2xx response",
pipeline: goodPipe,
expectMinWait: 0,
},
{
name: "non-429 response",
pipeline: mockPipeline{
resp: &http.Response{
StatusCode: http.StatusBadGateway,
Header: retryAfter5,
},
},
expectMinWait: 0,
},
{
name: "429 response w/out retry header",
pipeline: mockPipeline{
resp: &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: http.Header{},
},
},
expectMinWait: 0,
},
{
name: "429 response w/ nan retry-after",
pipeline: mockPipeline{
resp: &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: retryAfterNan,
},
},
expectMinWait: 0,
},
{
name: "429 response w/ negative retry-after",
pipeline: mockPipeline{
resp: &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: retryAfterNeg1,
},
},
expectMinWait: 0,
},
{
name: "429 response w/ zero retry-after",
pipeline: mockPipeline{
resp: &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: retryAfter0,
},
},
expectMinWait: 0,
},
{
name: "429 response w/ positive retry-after",
pipeline: mockPipeline{
resp: &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: retryAfter5,
},
},
expectMinWait: 4,
},
}
for _, test := range table {
suite.Run(test.name, func() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
tm := throttlingMiddleware{newTimedFence()}
req := &http.Request{}
req = req.WithContext(ctx)
start := time.Now()
_, err := tm.Intercept(test.pipeline, 0, req)
require.NoError(t, err, clues.ToCore(err))
_, err = tm.Intercept(goodPipe, 0, req)
require.NoError(t, err, clues.ToCore(err))
assert.Less(t, test.expectMinWait, time.Since(start).Seconds())
})
}
}

View File

@ -147,6 +147,7 @@ func internalMiddleware(cc *clientConfig) []khttp.Middleware {
},
khttp.NewRedirectHandler(),
&LoggingMiddleware{},
&throttlingMiddleware{newTimedFence()},
&RateLimiterMiddleware{},
&MetricsMiddleware{},
}

View File

@ -268,6 +268,7 @@ func kiotaMiddlewares(
mw = append(
mw,
&throttlingMiddleware{newTimedFence()},
&RateLimiterMiddleware{},
&MetricsMiddleware{})