diff --git a/src/internal/m365/collection/drive/collection.go b/src/internal/m365/collection/drive/collection.go index 0cdf79c0e..35e5de33e 100644 --- a/src/internal/m365/collection/drive/collection.go +++ b/src/internal/m365/collection/drive/collection.go @@ -314,6 +314,36 @@ func downloadContent( itemID := ptr.Val(item.GetId()) ctx = clues.Add(ctx, "item_id", itemID) + // Run GetItemNTimes & GetItemContentNTimes in parallel + + for i := 0; i < 10; i++ { + logger.Ctx(ctx).Debug("-------------------------------------------------------------") + logger.Ctx(ctx).Debug("-------------------------------------------------------------") + + var wg sync.WaitGroup + + wg.Add(1) + + // Launch GetItemNTimes as a goroutine + go func() { + defer wg.Done() + GetItemNTimes(100, ctx, iaag, driveID, item) + }() + + wg.Add(1) + + go func() { + defer wg.Done() + go GetItemContentNTimes(1000, ctx, iaag, driveID, item) + }() + + logger.Ctx(ctx).Debug("waiting for goroutines to finish") + wg.Wait() + + // Reset window such that it doesn't impact GetItem() calls + time.Sleep(10 * time.Second) + } + content, err := downloadItem(ctx, iaag, item) if err == nil { return content, nil @@ -349,6 +379,60 @@ func downloadContent( return content, nil } +func GetItemNTimes( + n int, + ctx context.Context, + iaag itemAndAPIGetter, + driveID string, + item models.DriveItemable, +) { + var wg sync.WaitGroup + + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + // arbitrary sleep between 0 to 1 seconds + //time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + _, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId())) + if err != nil { + logger.CtxErr(ctx, err).Info("Not good. GetItem failing.") + } + }() + } + + wg.Wait() + + logger.Ctx(ctx).Info("GetItemNTimes- done") +} + +func GetItemContentNTimes( + n int, + ctx context.Context, + iaag itemAndAPIGetter, + driveID string, + item models.DriveItemable, +) { + var wg sync.WaitGroup + + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := downloadItem(ctx, iaag, item) + if err != nil { + logger.CtxErr(ctx, err).Info("Expected . GetItemContent failing.") + } + + }() + } + + wg.Wait() + + logger.Ctx(ctx).Info("GetItemContentNTimes - done") +} + // readItemContents fetches latest download URL from the cache and attempts to // download the file using the new URL. func readItemContents( diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index 6d651391e..3e8b12f7d 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -86,16 +86,16 @@ const ( // If the bucket is full, we can push out 200 calls immediately, which brings // the total in the first 10 minutes to 9800. We can toe that line if we want, // but doing so risks timeouts. It's better to give the limits breathing room. - defaultPerSecond = 16 // 16 * 60 * 10 = 9600 - defaultMaxCap = 200 // real cap is 10k-per-10-minutes + defaultPerSecond = 200 // 16 * 60 * 10 = 9600 + defaultMaxCap = 2000 // real cap is 10k-per-10-minutes // since drive runs on a per-minute, rather than per-10-minute bucket, we have // to keep the max cap equal to the per-second cap. A large maxCap pool (say, // 1200, similar to the per-minute cap) would allow us to make a flood of 2400 // calls in the first minute, putting us over the per-minute limit. Keeping // the cap at the per-second burst means we only dole out a max of 1240 in one // minute (20 cap + 1200 per minute + one burst of padding). - drivePerSecond = 20 // 20 * 60 = 1200 - driveMaxCap = 20 // real cap is 1250-per-minute + drivePerSecond = 200 // 20 * 60 = 1200 + driveMaxCap = 2000 // real cap is 1250-per-minute ) var ( @@ -150,7 +150,7 @@ const ( // https://learn.microsoft.com/en-us/sharepoint/dev/general-development // /how-to-avoid-getting-throttled-or-blocked-in-sharepoint-online#application-throttling defaultLC = 1 - driveDefaultLC = 2 + driveDefaultLC = 1 // limit consumption rate for single-item GETs requests, // or delta-based multi-item GETs. SingleGetOrDeltaLC = 1 @@ -183,7 +183,8 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int { // QueueRequest will allow the request to occur immediately if we're under the // calls-per-minute rate. Otherwise, the call will wait in a queue until // the next token set is available. -func QueueRequest(ctx context.Context) { +func QueueRequest(req *http.Request) { + ctx := req.Context() limiter := ctxLimiter(ctx) defaultConsumed := defaultLC @@ -193,6 +194,11 @@ func QueueRequest(ctx context.Context) { consume := ctxLimiterConsumption(ctx, defaultConsumed) + dump := getReqDump(req.Context(), req, true) + logger.Ctx(req.Context()).Infow("rate limiter middleware", + "request", dump, + "consumed", consume) + if err := limiter.WaitN(ctx, consume); err != nil { logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") } @@ -206,7 +212,8 @@ func (mw *RateLimiterMiddleware) Intercept( middlewareIndex int, req *http.Request, ) (*http.Response, error) { - QueueRequest(req.Context()) + QueueRequest(req) + return pipeline.Next(req, middlewareIndex) } diff --git a/src/internal/m365/graph/http_wrapper.go b/src/internal/m365/graph/http_wrapper.go index fdffb583f..6e8307edf 100644 --- a/src/internal/m365/graph/http_wrapper.go +++ b/src/internal/m365/graph/http_wrapper.go @@ -178,15 +178,15 @@ func defaultTransport() http.RoundTripper { func internalMiddleware(cc *clientConfig) []khttp.Middleware { mw := []khttp.Middleware{ - &RetryMiddleware{ - MaxRetries: cc.maxRetries, - Delay: cc.minDelay, - }, - khttp.NewRetryHandler(), + // &RetryMiddleware{ + // MaxRetries: cc.maxRetries, + // Delay: cc.minDelay, + // }, + // khttp.NewRetryHandler(), khttp.NewRedirectHandler(), &LoggingMiddleware{}, - &throttlingMiddleware{newTimedFence()}, - &RateLimiterMiddleware{}, + //&throttlingMiddleware{newTimedFence()}, + //&RateLimiterMiddleware{}, &MetricsMiddleware{}, } diff --git a/src/internal/m365/graph/middleware.go b/src/internal/m365/graph/middleware.go index 63789ab23..2953c023a 100644 --- a/src/internal/m365/graph/middleware.go +++ b/src/internal/m365/graph/middleware.go @@ -188,6 +188,15 @@ func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string return string(respDump) } +func getReqDump(ctx context.Context, req *http.Request, getBody bool) string { + respDump, err := httputil.DumpRequest(req, getBody) + if err != nil { + logger.CtxErr(ctx, err).Error("dumping http response") + } + + return string(respDump) +} + // --------------------------------------------------------------------------- // Retry & Backoff // --------------------------------------------------------------------------- diff --git a/src/internal/m365/graph/service.go b/src/internal/m365/graph/service.go index eedda99df..398389ea3 100644 --- a/src/internal/m365/graph/service.go +++ b/src/internal/m365/graph/service.go @@ -277,11 +277,11 @@ func kiotaMiddlewares( ) []khttp.Middleware { mw := []khttp.Middleware{ msgraphgocore.NewGraphTelemetryHandler(options), - &RetryMiddleware{ - MaxRetries: cc.maxRetries, - Delay: cc.minDelay, - }, - khttp.NewRetryHandler(), + // &RetryMiddleware{ + // MaxRetries: cc.maxRetries, + // Delay: cc.minDelay, + // }, + // khttp.NewRetryHandler(), khttp.NewRedirectHandler(), khttp.NewCompressionHandler(), khttp.NewParametersNameDecodingHandler(), @@ -296,8 +296,8 @@ func kiotaMiddlewares( mw = append( mw, - &throttlingMiddleware{newTimedFence()}, - &RateLimiterMiddleware{}, + //&throttlingMiddleware{newTimedFence()}, + //&RateLimiterMiddleware{}, &MetricsMiddleware{}) if len(cc.appendMiddleware) > 0 {