Test rate limiter isolation

This commit is contained in:
Abhishek Pandey 2023-09-27 03:13:15 +05:30
parent adbc7fe03e
commit d792eea773
5 changed files with 121 additions and 21 deletions

View File

@ -314,6 +314,36 @@ func downloadContent(
itemID := ptr.Val(item.GetId())
ctx = clues.Add(ctx, "item_id", itemID)
// Run GetItemNTimes & GetItemContentNTimes in parallel
for i := 0; i < 10; i++ {
logger.Ctx(ctx).Debug("-------------------------------------------------------------")
logger.Ctx(ctx).Debug("-------------------------------------------------------------")
var wg sync.WaitGroup
wg.Add(1)
// Launch GetItemNTimes as a goroutine
go func() {
defer wg.Done()
GetItemNTimes(100, ctx, iaag, driveID, item)
}()
wg.Add(1)
go func() {
defer wg.Done()
go GetItemContentNTimes(1000, ctx, iaag, driveID, item)
}()
logger.Ctx(ctx).Debug("waiting for goroutines to finish")
wg.Wait()
// Reset window such that it doesn't impact GetItem() calls
time.Sleep(10 * time.Second)
}
content, err := downloadItem(ctx, iaag, item)
if err == nil {
return content, nil
@ -349,6 +379,60 @@ func downloadContent(
return content, nil
}
func GetItemNTimes(
n int,
ctx context.Context,
iaag itemAndAPIGetter,
driveID string,
item models.DriveItemable,
) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// arbitrary sleep between 0 to 1 seconds
//time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
_, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
if err != nil {
logger.CtxErr(ctx, err).Info("Not good. GetItem failing.")
}
}()
}
wg.Wait()
logger.Ctx(ctx).Info("GetItemNTimes- done")
}
func GetItemContentNTimes(
n int,
ctx context.Context,
iaag itemAndAPIGetter,
driveID string,
item models.DriveItemable,
) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := downloadItem(ctx, iaag, item)
if err != nil {
logger.CtxErr(ctx, err).Info("Expected . GetItemContent failing.")
}
}()
}
wg.Wait()
logger.Ctx(ctx).Info("GetItemContentNTimes - done")
}
// readItemContents fetches latest download URL from the cache and attempts to
// download the file using the new URL.
func readItemContents(

View File

@ -86,16 +86,16 @@ const (
// If the bucket is full, we can push out 200 calls immediately, which brings
// the total in the first 10 minutes to 9800. We can toe that line if we want,
// but doing so risks timeouts. It's better to give the limits breathing room.
defaultPerSecond = 16 // 16 * 60 * 10 = 9600
defaultMaxCap = 200 // real cap is 10k-per-10-minutes
defaultPerSecond = 200 // 16 * 60 * 10 = 9600
defaultMaxCap = 2000 // real cap is 10k-per-10-minutes
// since drive runs on a per-minute, rather than per-10-minute bucket, we have
// to keep the max cap equal to the per-second cap. A large maxCap pool (say,
// 1200, similar to the per-minute cap) would allow us to make a flood of 2400
// calls in the first minute, putting us over the per-minute limit. Keeping
// the cap at the per-second burst means we only dole out a max of 1240 in one
// minute (20 cap + 1200 per minute + one burst of padding).
drivePerSecond = 20 // 20 * 60 = 1200
driveMaxCap = 20 // real cap is 1250-per-minute
drivePerSecond = 200 // 20 * 60 = 1200
driveMaxCap = 2000 // real cap is 1250-per-minute
)
var (
@ -150,7 +150,7 @@ const (
// https://learn.microsoft.com/en-us/sharepoint/dev/general-development
// /how-to-avoid-getting-throttled-or-blocked-in-sharepoint-online#application-throttling
defaultLC = 1
driveDefaultLC = 2
driveDefaultLC = 1
// limit consumption rate for single-item GETs requests,
// or delta-based multi-item GETs.
SingleGetOrDeltaLC = 1
@ -183,7 +183,8 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int {
// QueueRequest will allow the request to occur immediately if we're under the
// calls-per-minute rate. Otherwise, the call will wait in a queue until
// the next token set is available.
func QueueRequest(ctx context.Context) {
func QueueRequest(req *http.Request) {
ctx := req.Context()
limiter := ctxLimiter(ctx)
defaultConsumed := defaultLC
@ -193,6 +194,11 @@ func QueueRequest(ctx context.Context) {
consume := ctxLimiterConsumption(ctx, defaultConsumed)
dump := getReqDump(req.Context(), req, true)
logger.Ctx(req.Context()).Infow("rate limiter middleware",
"request", dump,
"consumed", consume)
if err := limiter.WaitN(ctx, consume); err != nil {
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
}
@ -206,7 +212,8 @@ func (mw *RateLimiterMiddleware) Intercept(
middlewareIndex int,
req *http.Request,
) (*http.Response, error) {
QueueRequest(req.Context())
QueueRequest(req)
return pipeline.Next(req, middlewareIndex)
}

View File

@ -178,15 +178,15 @@ func defaultTransport() http.RoundTripper {
func internalMiddleware(cc *clientConfig) []khttp.Middleware {
mw := []khttp.Middleware{
&RetryMiddleware{
MaxRetries: cc.maxRetries,
Delay: cc.minDelay,
},
khttp.NewRetryHandler(),
// &RetryMiddleware{
// MaxRetries: cc.maxRetries,
// Delay: cc.minDelay,
// },
// khttp.NewRetryHandler(),
khttp.NewRedirectHandler(),
&LoggingMiddleware{},
&throttlingMiddleware{newTimedFence()},
&RateLimiterMiddleware{},
//&throttlingMiddleware{newTimedFence()},
//&RateLimiterMiddleware{},
&MetricsMiddleware{},
}

View File

@ -188,6 +188,15 @@ func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string
return string(respDump)
}
func getReqDump(ctx context.Context, req *http.Request, getBody bool) string {
respDump, err := httputil.DumpRequest(req, getBody)
if err != nil {
logger.CtxErr(ctx, err).Error("dumping http response")
}
return string(respDump)
}
// ---------------------------------------------------------------------------
// Retry & Backoff
// ---------------------------------------------------------------------------

View File

@ -277,11 +277,11 @@ func kiotaMiddlewares(
) []khttp.Middleware {
mw := []khttp.Middleware{
msgraphgocore.NewGraphTelemetryHandler(options),
&RetryMiddleware{
MaxRetries: cc.maxRetries,
Delay: cc.minDelay,
},
khttp.NewRetryHandler(),
// &RetryMiddleware{
// MaxRetries: cc.maxRetries,
// Delay: cc.minDelay,
// },
// khttp.NewRetryHandler(),
khttp.NewRedirectHandler(),
khttp.NewCompressionHandler(),
khttp.NewParametersNameDecodingHandler(),
@ -296,8 +296,8 @@ func kiotaMiddlewares(
mw = append(
mw,
&throttlingMiddleware{newTimedFence()},
&RateLimiterMiddleware{},
//&throttlingMiddleware{newTimedFence()},
//&RateLimiterMiddleware{},
&MetricsMiddleware{})
if len(cc.appendMiddleware) > 0 {