diff --git a/src/internal/connector/graph/http_wrapper_test.go b/src/internal/connector/graph/http_wrapper_test.go index d5edaf27d..40abea977 100644 --- a/src/internal/connector/graph/http_wrapper_test.go +++ b/src/internal/connector/graph/http_wrapper_test.go @@ -88,10 +88,12 @@ func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_redirectMiddleware() { // and thus skip all the middleware hdr := http.Header{} hdr.Set("Location", "localhost:99999999/smarfs") + toResp := &http.Response{ StatusCode: 302, Header: hdr, } + mwResp := mwForceResp{ resp: toResp, alternate: func(req *http.Request) (bool, *http.Response, error) { diff --git a/src/internal/connector/graph/middleware.go b/src/internal/connector/graph/middleware.go index 004798cad..988644db7 100644 --- a/src/internal/connector/graph/middleware.go +++ b/src/internal/connector/graph/middleware.go @@ -360,24 +360,31 @@ func (mw RetryMiddleware) getRetryDelay( return exponentialBackoff.NextBackOff() } -// We're trying to keep calls below the 10k-per-10-minute threshold. -// 15 tokens every second nets 900 per minute. That's 9000 every 10 minutes, -// which is a bit below the mark. -// But suppose we have a minute-long dry spell followed by a 10 minute tsunami. -// We'll have built up 900 tokens in reserve, so the first 900 calls go through -// immediately. Over the next 10 minutes, we'll partition out the other calls -// at a rate of 900-per-minute, ending at a total of 9900. Theoretically, if -// the volume keeps up after that, we'll always stay between 9000 and 9900 out -// of 10k. const ( - defaultPerSecond = 15 - defaultMaxCap = 900 - drivePerSecond = 15 - driveMaxCap = 1100 + // Default goal is to keep calls below the 10k-per-10-minute threshold. + // 14 tokens every second nets 840 per minute. That's 8400 every 10 minutes, + // which is a bit below the mark. + // But suppose we have a minute-long dry spell followed by a 10 minute tsunami. + // We'll have built up 750 tokens in reserve, so the first 750 calls go through + // immediately. Over the next 10 minutes, we'll partition out the other calls + // at a rate of 840-per-minute, ending at a total of 9150. Theoretically, if + // the volume keeps up after that, we'll always stay between 8400 and 9150 out + // of 10k. Worst case scenario, we have an extra minute of padding to allow + // up to 9990. + defaultPerSecond = 14 // 14 * 60 = 840 + defaultMaxCap = 750 // real cap is 10k-per-10-minutes + // since drive runs on a per-minute, rather than per-10-minute bucket, we have + // to keep the max cap equal to the per-second cap. A large maxCap pool (say, + // 1200, similar to the per-minute cap) would allow us to make a flood of 2400 + // calls in the first minute, putting us over the per-minute limit. Keeping + // the cap at the per-second burst means we only dole out a max of 1240 in one + // minute (20 cap + 1200 per minute + one burst of padding). + drivePerSecond = 20 // 20 * 60 = 1200 + driveMaxCap = 20 // real cap is 1250-per-minute ) var ( - driveLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) + driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap) // also used as the exchange service limiter defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) ) @@ -454,6 +461,8 @@ func (mw *ThrottleControlMiddleware) Intercept( // MetricsMiddleware aggregates per-request metrics on the events bus type MetricsMiddleware struct{} +const xmruHeader = "x-ms-resource-unit" + func (mw *MetricsMiddleware) Intercept( pipeline khttp.Pipeline, middlewareIndex int, @@ -474,5 +483,18 @@ func (mw *MetricsMiddleware) Intercept( events.Since(start, events.APICall) events.Since(start, events.APICall, status) + // track the graph "resource cost" for each call (if not provided, assume 1) + + // from msoft throttling documentation: + // x-ms-resource-unit - Indicates the resource unit used for this request. Values are positive integer + xmru := resp.Header.Get(xmruHeader) + xmrui, e := strconv.Atoi(xmru) + + if len(xmru) == 0 || e != nil { + xmrui = 1 + } + + events.IncN(xmrui, events.APICall, xmruHeader) + return resp, err } diff --git a/src/internal/events/events.go b/src/internal/events/events.go index 47a15f5e9..f900c50c4 100644 --- a/src/internal/events/events.go +++ b/src/internal/events/events.go @@ -188,10 +188,12 @@ func tenantHash(tenID string) string { // metrics aggregation // --------------------------------------------------------------------------- -type m string +type metricsCategory string // metrics collection bucket -const APICall m = "api_call" +const ( + APICall metricsCategory = "api_call" +) // configurations const ( @@ -256,13 +258,19 @@ func dumpMetrics(ctx context.Context, stop <-chan struct{}, sig *metrics.InmemSi } // Inc increments the given category by 1. -func Inc(cat m, keys ...string) { +func Inc(cat metricsCategory, keys ...string) { cats := append([]string{string(cat)}, keys...) metrics.IncrCounter(cats, 1) } +// IncN increments the given category by N. +func IncN(n int, cat metricsCategory, keys ...string) { + cats := append([]string{string(cat)}, keys...) + metrics.IncrCounter(cats, float32(n)) +} + // Since records the duration between the provided time and now, in millis. -func Since(start time.Time, cat m, keys ...string) { +func Since(start time.Time, cat metricsCategory, keys ...string) { cats := append([]string{string(cat)}, keys...) metrics.MeasureSince(cats, start) }