From cc35b1ed97f924630875667eeaf852284f42db17 Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 18 May 2023 12:21:57 -0600 Subject: [PATCH] add global throttling fence to graph middleware (#3432) Adds a graph middleware that blocks all calls based on the most recent retry-after header in a 429 response. This prevents us from failing on 429 requests due to external pressures even while the corso client believes it is within quota on rate limiting and concurrency. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :robot: Reliability/Supportability/Tests #### Issue(s) * #3407 #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- .../connector/graph/concurrency_middleware.go | 143 ++++++++++++ .../graph/concurrency_middleware_test.go | 210 +++++++++++++++++- src/internal/connector/graph/http_wrapper.go | 1 + src/internal/connector/graph/service.go | 1 + 4 files changed, 350 insertions(+), 5 deletions(-) diff --git a/src/internal/connector/graph/concurrency_middleware.go b/src/internal/connector/graph/concurrency_middleware.go index 2756a60c6..ba2a08fa6 100644 --- a/src/internal/connector/graph/concurrency_middleware.go +++ b/src/internal/connector/graph/concurrency_middleware.go @@ -3,7 +3,9 @@ package graph import ( "context" "net/http" + "strconv" "sync" + "time" "github.com/alcionai/clues" khttp "github.com/microsoft/kiota-http-go" @@ -200,3 +202,144 @@ func (mw *RateLimiterMiddleware) Intercept( QueueRequest(req.Context()) return pipeline.Next(req, middlewareIndex) } + +// --------------------------------------------------------------------------- +// global throttle fencing +// --------------------------------------------------------------------------- + +// timedFence sets up a fence for a certain amount of time. +// the time can be extended arbitrarily. All processes blocked at +// the fence will be let through when all timer extensions conclude. +type timedFence struct { + mu sync.Mutex + c chan struct{} + timers map[int64]*time.Timer +} + +func newTimedFence() *timedFence { + return &timedFence{ + mu: sync.Mutex{}, + c: nil, + timers: map[int64]*time.Timer{}, + } +} + +// Block until the fence is let down. +// if no fence is up, return immediately. +// returns if the ctx deadlines before the fence is let down. +func (tf *timedFence) Block(ctx context.Context) error { + // set to a local var to avoid race panics from tf.c + // getting set to nil between the conditional check and + // the read case. If c gets closed between those two + // points then the select case will exit immediately, + // as if we didn't block at all. + c := tf.c + + if c != nil { + select { + case <-ctx.Done(): + return clues.Wrap(ctx.Err(), "blocked on throttling fence") + case <-c: + } + } + + return nil +} + +// RaiseFence puts up a fence to block requests for the provided +// duration of time. Seconds are always added to the current time. +// Multiple calls to RaiseFence are not additive. ie: calling +// `RaiseFence(5); RaiseFence(1)` will keep the fence up until +// now+5 seconds, not now+6 seconds. When the last remaining fence +// is dropped, all currently blocked calls are allowed through. +func (tf *timedFence) RaiseFence(seconds time.Duration) { + tf.mu.Lock() + defer tf.mu.Unlock() + + if seconds < 1 { + return + } + + if tf.c == nil { + tf.c = make(chan struct{}) + } + + timer := time.NewTimer(seconds) + tid := time.Now().Add(seconds).UnixMilli() + tf.timers[tid] = timer + + go func(c <-chan time.Time, id int64) { + // wait for the timeout + <-c + + tf.mu.Lock() + defer tf.mu.Unlock() + + // remove the timer + delete(tf.timers, id) + + // if no timers remain, close the channel to drop the fence + // and set the fenc channel to nil + if len(tf.timers) == 0 && tf.c != nil { + close(tf.c) + tf.c = nil + } + }(timer.C, tid) +} + +// throttlingMiddleware is used to ensure we don't overstep per-min request limits. +type throttlingMiddleware struct { + tf *timedFence +} + +func (mw *throttlingMiddleware) Intercept( + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, +) (*http.Response, error) { + err := mw.tf.Block(req.Context()) + if err != nil { + return nil, err + } + + resp, err := pipeline.Next(req, middlewareIndex) + if resp == nil || err != nil { + return resp, err + } + + seconds := getRetryAfterHeader(resp) + if seconds < 1 { + return resp, nil + } + + // if all prior conditions pass, we need to add a fence that blocks + // calls, globally, from progressing until the timeout retry-after + // passes. + mw.tf.RaiseFence(time.Duration(seconds) * time.Second) + + return resp, nil +} + +func getRetryAfterHeader(resp *http.Response) int { + if resp == nil || len(resp.Header) == 0 { + return -1 + } + + if resp.StatusCode != http.StatusTooManyRequests { + return -1 + } + + rah := resp.Header.Get(retryAfterHeader) + if len(rah) == 0 { + return -1 + } + + seconds, err := strconv.Atoi(rah) + if err != nil { + // the error itself is irrelevant, we only want + // to wait if we have a clear length of time to wait until. + return -1 + } + + return seconds +} diff --git a/src/internal/connector/graph/concurrency_middleware_test.go b/src/internal/connector/graph/concurrency_middleware_test.go index c5734a665..5e19f791a 100644 --- a/src/internal/connector/graph/concurrency_middleware_test.go +++ b/src/internal/connector/graph/concurrency_middleware_test.go @@ -8,23 +8,25 @@ import ( "testing" "time" + "github.com/alcionai/clues" khttp "github.com/microsoft/kiota-http-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "golang.org/x/net/context" "github.com/alcionai/corso/src/internal/tester" ) -type ConcurrencyLimiterUnitTestSuite struct { +type ConcurrencyMWUnitTestSuite struct { tester.Suite } func TestConcurrencyLimiterSuite(t *testing.T) { - suite.Run(t, &ConcurrencyLimiterUnitTestSuite{Suite: tester.NewUnitSuite(t)}) + suite.Run(t, &ConcurrencyMWUnitTestSuite{Suite: tester.NewUnitSuite(t)}) } -func (suite *ConcurrencyLimiterUnitTestSuite) TestConcurrencyLimiter() { +func (suite *ConcurrencyMWUnitTestSuite) TestConcurrencyLimiter() { t := suite.T() maxConcurrentRequests := 4 @@ -71,7 +73,7 @@ func (suite *ConcurrencyLimiterUnitTestSuite) TestConcurrencyLimiter() { wg.Wait() } -func (suite *ConcurrencyLimiterUnitTestSuite) TestInitializeConcurrencyLimiter() { +func (suite *ConcurrencyMWUnitTestSuite) TestInitializeConcurrencyLimiter() { t := suite.T() InitializeConcurrencyLimiter(2) @@ -80,7 +82,7 @@ func (suite *ConcurrencyLimiterUnitTestSuite) TestInitializeConcurrencyLimiter() assert.Equal(t, cap(concurrencyLim.semaphore), 2, "singleton semaphore capacity changed") } -func (suite *ConcurrencyLimiterUnitTestSuite) TestGenerateConcurrencyLimiter() { +func (suite *ConcurrencyMWUnitTestSuite) TestGenerateConcurrencyLimiter() { tests := []struct { name string cap int @@ -118,3 +120,201 @@ func (suite *ConcurrencyLimiterUnitTestSuite) TestGenerateConcurrencyLimiter() { }) } } + +func (suite *ConcurrencyMWUnitTestSuite) TestTimedFence_Block() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + tf := newTimedFence() + + // raise multiple fences, the longest at 5 seconds + for i := -5; i < 6; i++ { + tf.RaiseFence(time.Duration(i) * time.Second) + } + + // -5..0 get dropped, 1..5 get added + assert.Len(t, tf.timers, 5) + + start := time.Now() + + var wg sync.WaitGroup + + wg.Add(5) + + for i := 0; i < 5; i++ { + go func(i int) { + defer wg.Done() + + err := tf.Block(ctx) + require.NoError(t, err, clues.ToCore(err)) + }(i) + } + + wg.Wait() + + // should block for 5 seconds. comparing to 4 to avoid + // race condition flakes. + assert.Less(t, 4.0, time.Since(start).Seconds()) +} + +func (suite *ConcurrencyMWUnitTestSuite) TestTimedFence_Block_ctxDeadline() { + ctx, flush := tester.NewContext() + defer flush() + + ctx, _ = context.WithDeadline(ctx, time.Now().Add(2*time.Second)) + + t := suite.T() + tf := newTimedFence() + + // raise multiple fences, the longest at 10 seconds + for i := 1; i < 6; i++ { + tf.RaiseFence(time.Duration(i*2) * time.Second) + } + + start := time.Now() + + var wg sync.WaitGroup + + wg.Add(5) + + for i := 0; i < 5; i++ { + go func(i int) { + defer wg.Done() + + err := tf.Block(ctx) + // should error from ctx deadline + require.Error(t, err, clues.ToCore(err)) + }(i) + } + + wg.Wait() + + // should block for 2 seconds. comparing to 3 to avoid + // race condition flakes. + assert.Greater(t, 3.0, time.Since(start).Seconds()) +} + +type mockPipeline struct { + resp *http.Response + err error +} + +func (mp mockPipeline) Next(*http.Request, int) (*http.Response, error) { + return mp.resp, mp.err +} + +func (suite *ConcurrencyMWUnitTestSuite) TestThrottlingMiddleware() { + retryAfterNan := http.Header{} + retryAfterNan.Set(retryAfterHeader, "brunhuldi") + + retryAfterNeg1 := http.Header{} + retryAfterNeg1.Set(retryAfterHeader, "-1") + + retryAfter0 := http.Header{} + retryAfter0.Set(retryAfterHeader, "0") + + retryAfter5 := http.Header{} + retryAfter5.Set(retryAfterHeader, "5") + + goodPipe := mockPipeline{ + resp: &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + }, + } + + table := []struct { + name string + pipeline mockPipeline + expectMinWait float64 + }{ + { + name: "2xx response", + pipeline: goodPipe, + expectMinWait: 0, + }, + { + name: "non-429 response", + pipeline: mockPipeline{ + resp: &http.Response{ + StatusCode: http.StatusBadGateway, + Header: retryAfter5, + }, + }, + expectMinWait: 0, + }, + { + name: "429 response w/out retry header", + pipeline: mockPipeline{ + resp: &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + }, + }, + expectMinWait: 0, + }, + { + name: "429 response w/ nan retry-after", + pipeline: mockPipeline{ + resp: &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: retryAfterNan, + }, + }, + expectMinWait: 0, + }, + { + name: "429 response w/ negative retry-after", + pipeline: mockPipeline{ + resp: &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: retryAfterNeg1, + }, + }, + expectMinWait: 0, + }, + { + name: "429 response w/ zero retry-after", + pipeline: mockPipeline{ + resp: &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: retryAfter0, + }, + }, + expectMinWait: 0, + }, + { + name: "429 response w/ positive retry-after", + pipeline: mockPipeline{ + resp: &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: retryAfter5, + }, + }, + expectMinWait: 4, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + tm := throttlingMiddleware{newTimedFence()} + + req := &http.Request{} + req = req.WithContext(ctx) + + start := time.Now() + + _, err := tm.Intercept(test.pipeline, 0, req) + require.NoError(t, err, clues.ToCore(err)) + + _, err = tm.Intercept(goodPipe, 0, req) + require.NoError(t, err, clues.ToCore(err)) + + assert.Less(t, test.expectMinWait, time.Since(start).Seconds()) + }) + } +} diff --git a/src/internal/connector/graph/http_wrapper.go b/src/internal/connector/graph/http_wrapper.go index b0bca76e2..bc30e88e6 100644 --- a/src/internal/connector/graph/http_wrapper.go +++ b/src/internal/connector/graph/http_wrapper.go @@ -147,6 +147,7 @@ func internalMiddleware(cc *clientConfig) []khttp.Middleware { }, khttp.NewRedirectHandler(), &LoggingMiddleware{}, + &throttlingMiddleware{newTimedFence()}, &RateLimiterMiddleware{}, &MetricsMiddleware{}, } diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index f0aa71f08..4335972a4 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -268,6 +268,7 @@ func kiotaMiddlewares( mw = append( mw, + &throttlingMiddleware{newTimedFence()}, &RateLimiterMiddleware{}, &MetricsMiddleware{})