diff --git a/CHANGELOG.md b/CHANGELOG.md index dcfecc3ec..3d49a12c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Remove exchange item filtering based on m365 item ID via the CLI. - OneDrive backups no longer include a user's non-default drives. - OneDrive and SharePoint file downloads will properly redirect from 3xx responses. +- Refined oneDrive rate limiter controls to reduce throttling errors. ## [v0.7.0] (beta) - 2023-05-02 diff --git a/src/internal/connector/graph/concurrency_limiter.go b/src/internal/connector/graph/concurrency_limiter.go deleted file mode 100644 index 6fe1ea0cd..000000000 --- a/src/internal/connector/graph/concurrency_limiter.go +++ /dev/null @@ -1,53 +0,0 @@ -package graph - -import ( - "net/http" - "sync" - - "github.com/alcionai/clues" - khttp "github.com/microsoft/kiota-http-go" -) - -// concurrencyLimiter middleware limits the number of concurrent requests to graph API -type concurrencyLimiter struct { - semaphore chan struct{} -} - -var ( - once sync.Once - concurrencyLim *concurrencyLimiter - maxConcurrentRequests = 4 -) - -func generateConcurrencyLimiter(capacity int) *concurrencyLimiter { - if capacity < 1 || capacity > maxConcurrentRequests { - capacity = maxConcurrentRequests - } - - return &concurrencyLimiter{ - semaphore: make(chan struct{}, capacity), - } -} - -func InitializeConcurrencyLimiter(capacity int) { - once.Do(func() { - concurrencyLim = generateConcurrencyLimiter(capacity) - }) -} - -func (cl *concurrencyLimiter) Intercept( - pipeline khttp.Pipeline, - middlewareIndex int, - req *http.Request, -) (*http.Response, error) { - if cl == nil || cl.semaphore == nil { - return nil, clues.New("nil concurrency limiter") - } - - cl.semaphore <- struct{}{} - defer func() { - <-cl.semaphore - }() - - return pipeline.Next(req, middlewareIndex) -} diff --git a/src/internal/connector/graph/concurrency_middleware.go b/src/internal/connector/graph/concurrency_middleware.go new file mode 100644 index 000000000..2756a60c6 --- /dev/null +++ b/src/internal/connector/graph/concurrency_middleware.go @@ -0,0 +1,202 @@ +package graph + +import ( + "context" + "net/http" + "sync" + + "github.com/alcionai/clues" + khttp "github.com/microsoft/kiota-http-go" + "golang.org/x/time/rate" + + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" +) + +// --------------------------------------------------------------------------- +// Concurrency Limiter +// "how many calls at one time" +// --------------------------------------------------------------------------- + +// concurrencyLimiter middleware limits the number of concurrent requests to graph API +type concurrencyLimiter struct { + semaphore chan struct{} +} + +var ( + once sync.Once + concurrencyLim *concurrencyLimiter + maxConcurrentRequests = 4 +) + +func generateConcurrencyLimiter(capacity int) *concurrencyLimiter { + if capacity < 1 || capacity > maxConcurrentRequests { + capacity = maxConcurrentRequests + } + + return &concurrencyLimiter{ + semaphore: make(chan struct{}, capacity), + } +} + +func InitializeConcurrencyLimiter(capacity int) { + once.Do(func() { + concurrencyLim = generateConcurrencyLimiter(capacity) + }) +} + +func (cl *concurrencyLimiter) Intercept( + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, +) (*http.Response, error) { + if cl == nil || cl.semaphore == nil { + return nil, clues.New("nil concurrency limiter") + } + + cl.semaphore <- struct{}{} + defer func() { + <-cl.semaphore + }() + + return pipeline.Next(req, middlewareIndex) +} + +//nolint:lll +// --------------------------------------------------------------------------- +// Rate Limiter +// "how many calls in a minute" +// https://learn.microsoft.com/en-us/sharepoint/dev/general-development/how-to-avoid-getting-throttled-or-blocked-in-sharepoint-online +// --------------------------------------------------------------------------- + +const ( + // Default goal is to keep calls below the 10k-per-10-minute threshold. + // 14 tokens every second nets 840 per minute. That's 8400 every 10 minutes, + // which is a bit below the mark. + // But suppose we have a minute-long dry spell followed by a 10 minute tsunami. + // We'll have built up 750 tokens in reserve, so the first 750 calls go through + // immediately. Over the next 10 minutes, we'll partition out the other calls + // at a rate of 840-per-minute, ending at a total of 9150. Theoretically, if + // the volume keeps up after that, we'll always stay between 8400 and 9150 out + // of 10k. Worst case scenario, we have an extra minute of padding to allow + // up to 9990. + defaultPerSecond = 14 // 14 * 60 = 840 + defaultMaxCap = 750 // real cap is 10k-per-10-minutes + // since drive runs on a per-minute, rather than per-10-minute bucket, we have + // to keep the max cap equal to the per-second cap. A large maxCap pool (say, + // 1200, similar to the per-minute cap) would allow us to make a flood of 2400 + // calls in the first minute, putting us over the per-minute limit. Keeping + // the cap at the per-second burst means we only dole out a max of 1240 in one + // minute (20 cap + 1200 per minute + one burst of padding). + drivePerSecond = 20 // 20 * 60 = 1200 + driveMaxCap = 20 // real cap is 1250-per-minute +) + +var ( + driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap) + // also used as the exchange service limiter + defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) +) + +type LimiterCfg struct { + Service path.ServiceType +} + +type limiterCfgKey string + +const limiterCfgCtxKey limiterCfgKey = "corsoGaphRateLimiterCfg" + +func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context { + return context.WithValue(ctx, limiterCfgCtxKey, lc) +} + +func ctxLimiter(ctx context.Context) *rate.Limiter { + lc, ok := extractRateLimiterConfig(ctx) + if !ok { + return defaultLimiter + } + + switch lc.Service { + case path.OneDriveService, path.SharePointService: + return driveLimiter + default: + return defaultLimiter + } +} + +func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) { + l := ctx.Value(limiterCfgCtxKey) + if l == nil { + return LimiterCfg{}, false + } + + lc, ok := l.(LimiterCfg) + + return lc, ok +} + +type limiterConsumptionKey string + +const limiterConsumptionCtxKey limiterConsumptionKey = "corsoGraphRateLimiterConsumption" + +const ( + defaultLC = 1 + driveDefaultLC = 2 + // limit consumption rate for single-item GETs requests, + // or delta-based multi-item GETs. + SingleGetOrDeltaLC = 1 + // limit consumption rate for anything permissions related + PermissionsLC = 5 +) + +// ConsumeNTokens ensures any calls using this context will consume +// n rate-limiter tokens. Default is 1, and this value does not need +// to be established in the context to consume the default tokens. +// This should only get used on a per-call basis, to avoid cross-pollination. +func ConsumeNTokens(ctx context.Context, n int) context.Context { + return context.WithValue(ctx, limiterConsumptionCtxKey, n) +} + +func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int { + l := ctx.Value(limiterConsumptionCtxKey) + if l == nil { + return defaultConsumption + } + + lc, ok := l.(int) + if !ok || lc < 1 { + return defaultConsumption + } + + return lc +} + +// QueueRequest will allow the request to occur immediately if we're under the +// calls-per-minute rate. Otherwise, the call will wait in a queue until +// the next token set is available. +func QueueRequest(ctx context.Context) { + limiter := ctxLimiter(ctx) + defaultConsumed := defaultLC + + if limiter == driveLimiter { + defaultConsumed = driveDefaultLC + } + + consume := ctxLimiterConsumption(ctx, defaultConsumed) + + if err := limiter.WaitN(ctx, consume); err != nil { + logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") + } +} + +// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits. +type RateLimiterMiddleware struct{} + +func (mw *RateLimiterMiddleware) Intercept( + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, +) (*http.Response, error) { + QueueRequest(req.Context()) + return pipeline.Next(req, middlewareIndex) +} diff --git a/src/internal/connector/graph/concurrency_limiter_test.go b/src/internal/connector/graph/concurrency_middleware_test.go similarity index 100% rename from src/internal/connector/graph/concurrency_limiter_test.go rename to src/internal/connector/graph/concurrency_middleware_test.go diff --git a/src/internal/connector/graph/http_wrapper.go b/src/internal/connector/graph/http_wrapper.go index 55e9f9556..b0bca76e2 100644 --- a/src/internal/connector/graph/http_wrapper.go +++ b/src/internal/connector/graph/http_wrapper.go @@ -147,7 +147,7 @@ func internalMiddleware(cc *clientConfig) []khttp.Middleware { }, khttp.NewRedirectHandler(), &LoggingMiddleware{}, - &ThrottleControlMiddleware{}, + &RateLimiterMiddleware{}, &MetricsMiddleware{}, } diff --git a/src/internal/connector/graph/middleware.go b/src/internal/connector/graph/middleware.go index 8730067e7..108f03cac 100644 --- a/src/internal/connector/graph/middleware.go +++ b/src/internal/connector/graph/middleware.go @@ -14,12 +14,10 @@ import ( backoff "github.com/cenkalti/backoff/v4" khttp "github.com/microsoft/kiota-http-go" "golang.org/x/exp/slices" - "golang.org/x/time/rate" "github.com/alcionai/corso/src/internal/common/pii" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/pkg/logger" - "github.com/alcionai/corso/src/pkg/path" ) type nexter interface { @@ -381,100 +379,6 @@ func (mw RetryMiddleware) getRetryDelay( return exponentialBackoff.NextBackOff() } -const ( - // Default goal is to keep calls below the 10k-per-10-minute threshold. - // 14 tokens every second nets 840 per minute. That's 8400 every 10 minutes, - // which is a bit below the mark. - // But suppose we have a minute-long dry spell followed by a 10 minute tsunami. - // We'll have built up 750 tokens in reserve, so the first 750 calls go through - // immediately. Over the next 10 minutes, we'll partition out the other calls - // at a rate of 840-per-minute, ending at a total of 9150. Theoretically, if - // the volume keeps up after that, we'll always stay between 8400 and 9150 out - // of 10k. Worst case scenario, we have an extra minute of padding to allow - // up to 9990. - defaultPerSecond = 14 // 14 * 60 = 840 - defaultMaxCap = 750 // real cap is 10k-per-10-minutes - // since drive runs on a per-minute, rather than per-10-minute bucket, we have - // to keep the max cap equal to the per-second cap. A large maxCap pool (say, - // 1200, similar to the per-minute cap) would allow us to make a flood of 2400 - // calls in the first minute, putting us over the per-minute limit. Keeping - // the cap at the per-second burst means we only dole out a max of 1240 in one - // minute (20 cap + 1200 per minute + one burst of padding). - drivePerSecond = 20 // 20 * 60 = 1200 - driveMaxCap = 20 // real cap is 1250-per-minute -) - -var ( - driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap) - // also used as the exchange service limiter - defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) -) - -type LimiterCfg struct { - Service path.ServiceType -} - -type limiterCfgKey string - -const limiterCfgCtxKey limiterCfgKey = "corsoGraphRateLimiterCfg" - -func ctxLimiter(ctx context.Context) *rate.Limiter { - lc, ok := extractRateLimiterConfig(ctx) - if !ok { - return defaultLimiter - } - - switch lc.Service { - case path.OneDriveService, path.SharePointService: - return driveLimiter - default: - return defaultLimiter - } -} - -func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context { - return context.WithValue(ctx, limiterCfgCtxKey, lc) -} - -func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) { - l := ctx.Value(limiterCfgCtxKey) - if l == nil { - return LimiterCfg{}, false - } - - lc, ok := l.(LimiterCfg) - - return lc, ok -} - -// QueueRequest will allow the request to occur immediately if we're under the -// 1k-calls-per-minute rate. Otherwise, the call will wait in a queue until -// the next token set is available. -func QueueRequest(ctx context.Context) { - limiter := ctxLimiter(ctx) - - if err := limiter.Wait(ctx); err != nil { - logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") - } -} - -// --------------------------------------------------------------------------- -// Rate Limiting -// --------------------------------------------------------------------------- - -// ThrottleControlMiddleware is used to ensure we don't overstep 10k-per-10-min -// request limits. -type ThrottleControlMiddleware struct{} - -func (mw *ThrottleControlMiddleware) Intercept( - pipeline khttp.Pipeline, - middlewareIndex int, - req *http.Request, -) (*http.Response, error) { - QueueRequest(req.Context()) - return pipeline.Next(req, middlewareIndex) -} - // --------------------------------------------------------------------------- // Metrics // --------------------------------------------------------------------------- diff --git a/src/internal/connector/graph/middleware_test.go b/src/internal/connector/graph/middleware_test.go index 6ca660231..3aa77778c 100644 --- a/src/internal/connector/graph/middleware_test.go +++ b/src/internal/connector/graph/middleware_test.go @@ -292,3 +292,47 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() { }) } } + +func (suite *MiddlewareUnitSuite) TestLimiterConsumption() { + ctx, flush := tester.NewContext() + defer flush() + + // an unpopulated ctx should produce the default consumption + assert.Equal(suite.T(), defaultLC, ctxLimiterConsumption(ctx, defaultLC)) + + table := []struct { + name string + n int + expect int + }{ + { + name: "matches default", + n: defaultLC, + expect: defaultLC, + }, + { + name: "default+1", + n: defaultLC + 1, + expect: defaultLC + 1, + }, + { + name: "zero", + n: 0, + expect: defaultLC, + }, + { + name: "negative", + n: -1, + expect: defaultLC, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + tctx := ConsumeNTokens(ctx, test.n) + lc := ctxLimiterConsumption(tctx, defaultLC) + assert.Equal(t, test.expect, lc) + }) + } +} diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index e05838793..dc5129ac4 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -264,7 +264,7 @@ func kiotaMiddlewares( khttp.NewParametersNameDecodingHandler(), khttp.NewUserAgentHandler(), &LoggingMiddleware{}, - &ThrottleControlMiddleware{}, + &RateLimiterMiddleware{}, &MetricsMiddleware{}, }...) diff --git a/src/internal/connector/onedrive/api/drive.go b/src/internal/connector/onedrive/api/drive.go index 8d0b1571f..d87546830 100644 --- a/src/internal/connector/onedrive/api/drive.go +++ b/src/internal/connector/onedrive/api/drive.go @@ -373,7 +373,10 @@ func GetDriveRoot( srv graph.Servicer, driveID string, ) (models.DriveItemable, error) { - root, err := srv.Client().DrivesById(driveID).Root().Get(ctx, nil) + root, err := srv.Client(). + DrivesById(driveID). + Root(). + Get(ctx, nil) if err != nil { return nil, graph.Wrap(ctx, err, "getting drive root") } diff --git a/src/internal/connector/onedrive/drive.go b/src/internal/connector/onedrive/drive.go index b34f860da..c499aac05 100644 --- a/src/internal/connector/onedrive/drive.go +++ b/src/internal/connector/onedrive/drive.go @@ -157,7 +157,8 @@ func collectItems( } for { - page, err := pager.GetPage(ctx) + // assume delta urls here, which allows single-token consumption + page, err := pager.GetPage(graph.ConsumeNTokens(ctx, graph.SingleGetOrDeltaLC)) if graph.IsErrInvalidDelta(err) { logger.Ctx(ctx).Infow("Invalid previous delta link", "link", prevDelta) diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index c7cebc8c1..ac992e90a 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -333,7 +333,11 @@ func driveItemWriter( session := drives.NewItemItemsItemCreateUploadSessionPostRequestBody() ctx = clues.Add(ctx, "upload_item_id", itemID) - r, err := service.Client().DrivesById(driveID).ItemsById(itemID).CreateUploadSession().Post(ctx, session, nil) + r, err := service.Client(). + DrivesById(driveID). + ItemsById(itemID). + CreateUploadSession(). + Post(ctx, session, nil) if err != nil { return nil, graph.Wrap(ctx, err, "creating item upload session") } diff --git a/src/internal/connector/onedrive/permission.go b/src/internal/connector/onedrive/permission.go index 7cd4b530d..b67973be0 100644 --- a/src/internal/connector/onedrive/permission.go +++ b/src/internal/connector/onedrive/permission.go @@ -161,7 +161,7 @@ func UpdatePermissions( DrivesById(driveID). ItemsById(itemID). PermissionsById(pid). - Delete(ctx, nil) + Delete(graph.ConsumeNTokens(ctx, graph.PermissionsLC), nil) if err != nil { return graph.Wrap(ctx, err, "removing permissions") } @@ -207,7 +207,11 @@ func UpdatePermissions( pbody.SetRecipients([]models.DriveRecipientable{rec}) - np, err := service.Client().DrivesById(driveID).ItemsById(itemID).Invite().Post(ctx, pbody, nil) + np, err := service.Client(). + DrivesById(driveID). + ItemsById(itemID). + Invite(). + Post(graph.ConsumeNTokens(ctx, graph.PermissionsLC), pbody, nil) if err != nil { return graph.Wrap(ctx, err, "setting permissions") } diff --git a/src/internal/connector/sharepoint/restore.go b/src/internal/connector/sharepoint/restore.go index 013f2ef79..642e9fd32 100644 --- a/src/internal/connector/sharepoint/restore.go +++ b/src/internal/connector/sharepoint/restore.go @@ -172,7 +172,11 @@ func restoreListItem( newList.SetItems(contents) // Restore to List base to M365 back store - restoredList, err := service.Client().SitesById(siteID).Lists().Post(ctx, newList, nil) + restoredList, err := service. + Client(). + SitesById(siteID). + Lists(). + Post(ctx, newList, nil) if err != nil { return dii, graph.Wrap(ctx, err, "restoring list") }