diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index 03c302461..606c1987c 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -360,7 +360,7 @@ func (c Mail) Serialize( func MailInfo(msg models.Messageable) *details.ExchangeInfo { var ( - sender = graph.UnwrapEmailAddress(msg.GetSender()) + sender = UnwrapEmailAddress(msg.GetSender()) subject = ptr.Val(msg.GetSubject()) received = ptr.Val(msg.GetReceivedDateTime()) created = ptr.Val(msg.GetCreatedDateTime()) @@ -370,7 +370,7 @@ func MailInfo(msg models.Messageable) *details.ExchangeInfo { if msg.GetToRecipients() != nil { ppl := msg.GetToRecipients() for _, entry := range ppl { - temp := graph.UnwrapEmailAddress(entry) + temp := UnwrapEmailAddress(entry) if len(temp) > 0 { recipients = append(recipients, temp) } @@ -387,3 +387,12 @@ func MailInfo(msg models.Messageable) *details.ExchangeInfo { Modified: ptr.OrNow(msg.GetLastModifiedDateTime()), } } + +func UnwrapEmailAddress(contact models.Recipientable) string { + var empty string + if contact == nil || contact.GetEmailAddress() == nil { + return empty + } + + return ptr.Val(contact.GetEmailAddress().GetAddress()) +} diff --git a/src/internal/connector/graph/middleware.go b/src/internal/connector/graph/middleware.go new file mode 100644 index 000000000..01b5a3275 --- /dev/null +++ b/src/internal/connector/graph/middleware.go @@ -0,0 +1,373 @@ +package graph + +import ( + "context" + "fmt" + "net/http" + "net/http/httputil" + "os" + "strconv" + "strings" + "time" + + "github.com/alcionai/clues" + backoff "github.com/cenkalti/backoff/v4" + khttp "github.com/microsoft/kiota-http-go" + "golang.org/x/time/rate" + + "github.com/alcionai/corso/src/internal/common/pii" + "github.com/alcionai/corso/src/internal/events" + "github.com/alcionai/corso/src/pkg/logger" +) + +// --------------------------------------------------------------------------- +// Logging +// --------------------------------------------------------------------------- + +// LoggingMiddleware can be used to log the http request sent by the graph client +type LoggingMiddleware struct{} + +// well-known path names used by graph api calls +// used to un-hide path elements in a pii.SafeURL +var SafeURLPathParams = pii.MapWithPlurals( + //nolint:misspell + "alltime", + "analytics", + "archive", + "beta", + "calendargroup", + "calendar", + "calendarview", + "channel", + "childfolder", + "children", + "clone", + "column", + "contactfolder", + "contact", + "contenttype", + "delta", + "drive", + "event", + "group", + "inbox", + "instance", + "invitation", + "item", + "joinedteam", + "label", + "list", + "mailfolder", + "member", + "message", + "notification", + "page", + "primarychannel", + "root", + "security", + "site", + "subscription", + "team", + "unarchive", + "user", + "v1.0") + +// well-known safe query parameters used by graph api calls +// +// used to un-hide query params in a pii.SafeURL +var SafeURLQueryParams = map[string]struct{}{ + "deltatoken": {}, + "startdatetime": {}, + "enddatetime": {}, + "$count": {}, + "$expand": {}, + "$filter": {}, + "$select": {}, + "$top": {}, +} + +func LoggableURL(url string) pii.SafeURL { + return pii.SafeURL{ + URL: url, + SafePathElems: SafeURLPathParams, + SafeQueryKeys: SafeURLQueryParams, + } +} + +func (handler *LoggingMiddleware) Intercept( + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, +) (*http.Response, error) { + ctx := clues.Add( + req.Context(), + "method", req.Method, + "url", LoggableURL(req.URL.String()), + "request_len", req.ContentLength) + + // call the next middleware + resp, err := pipeline.Next(req, middlewareIndex) + + if strings.Contains(req.URL.String(), "users//") { + logger.Ctx(ctx).Error("malformed request url: missing resource") + } + + if resp == nil { + return resp, err + } + + ctx = clues.Add(ctx, "status", resp.Status, "statusCode", resp.StatusCode) + log := logger.Ctx(ctx) + + // Return immediately if the response is good (2xx). + // If api logging is toggled, log a body-less dump of the request/resp. + if (resp.StatusCode / 100) == 2 { + if logger.DebugAPI || os.Getenv(log2xxGraphRequestsEnvKey) != "" { + log.Debugw("2xx graph api resp", "response", getRespDump(ctx, resp, os.Getenv(log2xxGraphResponseEnvKey) != "")) + } + + return resp, err + } + + // Log errors according to api debugging configurations. + // When debugging is toggled, every non-2xx is recorded with a response dump. + // Otherwise, throttling cases and other non-2xx responses are logged + // with a slimmer reference for telemetry/supportability purposes. + if logger.DebugAPI || os.Getenv(logGraphRequestsEnvKey) != "" { + log.Errorw("non-2xx graph api response", "response", getRespDump(ctx, resp, true)) + return resp, err + } + + msg := fmt.Sprintf("graph api error: %s", resp.Status) + + // special case for supportability: log all throttling cases. + if resp.StatusCode == http.StatusTooManyRequests { + log = log.With( + "limit", resp.Header.Get(rateLimitHeader), + "remaining", resp.Header.Get(rateRemainingHeader), + "reset", resp.Header.Get(rateResetHeader), + "retry-after", resp.Header.Get(retryAfterHeader)) + } else if resp.StatusCode/100 == 4 || resp.StatusCode == http.StatusServiceUnavailable { + log = log.With("response", getRespDump(ctx, resp, true)) + } + + log.Info(msg) + + return resp, err +} + +func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string { + respDump, err := httputil.DumpResponse(resp, getBody) + if err != nil { + logger.CtxErr(ctx, err).Error("dumping http response") + } + + return string(respDump) +} + +// --------------------------------------------------------------------------- +// Retry & Backoff +// --------------------------------------------------------------------------- + +// RetryHandler handles transient HTTP responses and retries the request given the retry options +type RetryHandler struct { + // The maximum number of times a request can be retried + MaxRetries int + // The delay in seconds between retries + Delay time.Duration +} + +func (middleware RetryHandler) retryRequest( + ctx context.Context, + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, + resp *http.Response, + executionCount int, + cumulativeDelay time.Duration, + exponentialBackoff *backoff.ExponentialBackOff, + respErr error, +) (*http.Response, error) { + if (respErr != nil || middleware.isRetriableErrorCode(req, resp.StatusCode)) && + middleware.isRetriableRequest(req) && + executionCount < middleware.MaxRetries { + executionCount++ + + delay := middleware.getRetryDelay(req, resp, exponentialBackoff) + + cumulativeDelay += delay + + req.Header.Set(retryAttemptHeader, strconv.Itoa(executionCount)) + + timer := time.NewTimer(delay) + + select { + case <-ctx.Done(): + // Don't retry if the context is marked as done, it will just error out + // when we attempt to send the retry anyway. + return resp, ctx.Err() + + // Will exit switch-block so the remainder of the code doesn't need to be + // indented. + case <-timer.C: + } + + response, err := pipeline.Next(req, middlewareIndex) + if err != nil && !IsErrTimeout(err) { + return response, Stack(ctx, err).With("retry_count", executionCount) + } + + return middleware.retryRequest(ctx, + pipeline, + middlewareIndex, + req, + response, + executionCount, + cumulativeDelay, + exponentialBackoff, + err) + } + + if respErr != nil { + return nil, Stack(ctx, respErr).With("retry_count", executionCount) + } + + return resp, nil +} + +func (middleware RetryHandler) isRetriableErrorCode(req *http.Request, code int) bool { + return code == http.StatusInternalServerError || code == http.StatusServiceUnavailable +} + +func (middleware RetryHandler) isRetriableRequest(req *http.Request) bool { + isBodiedMethod := req.Method == "POST" || req.Method == "PUT" || req.Method == "PATCH" + if isBodiedMethod && req.Body != nil { + return req.ContentLength != -1 + } + + return true +} + +func (middleware RetryHandler) getRetryDelay( + req *http.Request, + resp *http.Response, + exponentialBackoff *backoff.ExponentialBackOff, +) time.Duration { + var retryAfter string + if resp != nil { + retryAfter = resp.Header.Get(retryAfterHeader) + } + + if retryAfter != "" { + retryAfterDelay, err := strconv.ParseFloat(retryAfter, 64) + if err == nil { + return time.Duration(retryAfterDelay) * time.Second + } + } // TODO parse the header if it's a date + + return exponentialBackoff.NextBackOff() +} + +// Intercept implements the interface and evaluates whether to retry a failed request. +func (middleware RetryHandler) Intercept( + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, +) (*http.Response, error) { + ctx := req.Context() + + response, err := pipeline.Next(req, middlewareIndex) + if err != nil && !IsErrTimeout(err) { + return response, Stack(ctx, err) + } + + exponentialBackOff := backoff.NewExponentialBackOff() + exponentialBackOff.InitialInterval = middleware.Delay + exponentialBackOff.Reset() + + response, err = middleware.retryRequest( + ctx, + pipeline, + middlewareIndex, + req, + response, + 0, + 0, + exponentialBackOff, + err) + if err != nil { + return nil, Stack(ctx, err) + } + + return response, nil +} + +// We're trying to keep calls below the 10k-per-10-minute threshold. +// 15 tokens every second nets 900 per minute. That's 9000 every 10 minutes, +// which is a bit below the mark. +// But suppose we have a minute-long dry spell followed by a 10 minute tsunami. +// We'll have built up 900 tokens in reserve, so the first 900 calls go through +// immediately. Over the next 10 minutes, we'll partition out the other calls +// at a rate of 900-per-minute, ending at a total of 9900. Theoretically, if +// the volume keeps up after that, we'll always stay between 9000 and 9900 out +// of 10k. +const ( + perSecond = 15 + maxCap = 900 +) + +// Single, global rate limiter at this time. Refinements for method (creates, +// versus reads) or service can come later. +var limiter = rate.NewLimiter(perSecond, maxCap) + +// QueueRequest will allow the request to occur immediately if we're under the +// 1k-calls-per-minute rate. Otherwise, the call will wait in a queue until +// the next token set is available. +func QueueRequest(ctx context.Context) { + if err := limiter.Wait(ctx); err != nil { + logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") + } +} + +// --------------------------------------------------------------------------- +// Rate Limiting +// --------------------------------------------------------------------------- + +// ThrottleControlMiddleware is used to ensure we don't overstep 10k-per-10-min +// request limits. +type ThrottleControlMiddleware struct{} + +func (handler *ThrottleControlMiddleware) Intercept( + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, +) (*http.Response, error) { + QueueRequest(req.Context()) + return pipeline.Next(req, middlewareIndex) +} + +// MetricsMiddleware aggregates per-request metrics on the events bus +type MetricsMiddleware struct{} + +func (handler *MetricsMiddleware) Intercept( + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, +) (*http.Response, error) { + var ( + start = time.Now() + resp, err = pipeline.Next(req, middlewareIndex) + status = "nil-resp" + ) + + if resp != nil { + status = resp.Status + } + + events.Inc(events.APICall) + events.Inc(events.APICall, status) + events.Since(start, events.APICall) + events.Since(start, events.APICall, status) + + return resp, err +} diff --git a/src/internal/connector/graph/retry_middleware.go b/src/internal/connector/graph/retry_middleware.go deleted file mode 100644 index 33c228523..000000000 --- a/src/internal/connector/graph/retry_middleware.go +++ /dev/null @@ -1,114 +0,0 @@ -package graph - -import ( - "context" - "net/http" - "strconv" - "time" - - backoff "github.com/cenkalti/backoff/v4" - khttp "github.com/microsoft/kiota-http-go" -) - -// --------------------------------------------------------------------------- -// Client Middleware -// --------------------------------------------------------------------------- - -// RetryHandler handles transient HTTP responses and retries the request given the retry options -type RetryHandler struct { - // The maximum number of times a request can be retried - MaxRetries int - // The delay in seconds between retries - Delay time.Duration -} - -func (middleware RetryHandler) retryRequest( - ctx context.Context, - pipeline khttp.Pipeline, - middlewareIndex int, - req *http.Request, - resp *http.Response, - executionCount int, - cumulativeDelay time.Duration, - exponentialBackoff *backoff.ExponentialBackOff, - respErr error, -) (*http.Response, error) { - if (respErr != nil || middleware.isRetriableErrorCode(req, resp.StatusCode)) && - middleware.isRetriableRequest(req) && - executionCount < middleware.MaxRetries { - executionCount++ - - delay := middleware.getRetryDelay(req, resp, exponentialBackoff) - - cumulativeDelay += delay - - timer := time.NewTimer(delay) - - select { - case <-ctx.Done(): - // Don't retry if the context is marked as done, it will just error out - // when we attempt to send the retry anyway. - return resp, ctx.Err() - - // Will exit switch-block so the remainder of the code doesn't need to be - // indented. - case <-timer.C: - } - - req.Header.Set(retryAttemptHeader, strconv.Itoa(executionCount)) - - response, err := pipeline.Next(req, middlewareIndex) - if err != nil && !IsErrTimeout(err) { - return response, Stack(ctx, err).With("retry_count", executionCount) - } - - return middleware.retryRequest(ctx, - pipeline, - middlewareIndex, - req, - response, - executionCount, - cumulativeDelay, - exponentialBackoff, - err) - } - - if respErr != nil { - return nil, Stack(ctx, respErr).With("retry_count", executionCount) - } - - return resp, nil -} - -func (middleware RetryHandler) isRetriableErrorCode(req *http.Request, code int) bool { - return code == http.StatusInternalServerError || code == http.StatusServiceUnavailable -} - -func (middleware RetryHandler) isRetriableRequest(req *http.Request) bool { - isBodiedMethod := req.Method == "POST" || req.Method == "PUT" || req.Method == "PATCH" - if isBodiedMethod && req.Body != nil { - return req.ContentLength != -1 - } - - return true -} - -func (middleware RetryHandler) getRetryDelay( - req *http.Request, - resp *http.Response, - exponentialBackoff *backoff.ExponentialBackOff, -) time.Duration { - var retryAfter string - if resp != nil { - retryAfter = resp.Header.Get(retryAfterHeader) - } - - if retryAfter != "" { - retryAfterDelay, err := strconv.ParseFloat(retryAfter, 64) - if err == nil { - return time.Duration(retryAfterDelay) * time.Second - } - } // TODO parse the header if it's a date - - return exponentialBackoff.NextBackOff() -} diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index 33dda9fd0..9c5ed7d21 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -1,28 +1,18 @@ package graph import ( - "context" - "fmt" "net/http" - "net/http/httputil" - "os" - "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/alcionai/clues" - backoff "github.com/cenkalti/backoff/v4" "github.com/microsoft/kiota-abstractions-go/serialization" kauth "github.com/microsoft/kiota-authentication-azure-go" khttp "github.com/microsoft/kiota-http-go" msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" - "golang.org/x/time/rate" - "github.com/alcionai/corso/src/internal/common/pii" - "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/pkg/account" - "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -52,6 +42,19 @@ type QueryParams struct { Credentials account.M365Config } +// --------------------------------------------------------------------------- +// Interfaces +// --------------------------------------------------------------------------- + +type Servicer interface { + // Client() returns msgraph Service client that can be used to process and execute + // the majority of the queries to the M365 Backstore + Client() *msgraphsdkgo.GraphServiceClient + // Adapter() returns GraphRequest adapter used to process large requests, create batches + // and page iterators + Adapter() *msgraphsdkgo.GraphRequestAdapter +} + // --------------------------------------------------------------------------- // Service Handler // --------------------------------------------------------------------------- @@ -95,7 +98,59 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) { } // --------------------------------------------------------------------------- -// Helpers +// Adapter +// --------------------------------------------------------------------------- + +// CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library +// with Azure identity package. An adapter object is a necessary to component +// to create *msgraphsdk.GraphServiceClient +func CreateAdapter( + tenant, client, secret string, + opts ...option, +) (*msgraphsdkgo.GraphRequestAdapter, error) { + // Client Provider: Uses Secret for access to tenant-level data + cred, err := azidentity.NewClientSecretCredential(tenant, client, secret, nil) + if err != nil { + return nil, clues.Wrap(err, "creating m365 client identity") + } + + auth, err := kauth.NewAzureIdentityAuthenticationProviderWithScopes( + cred, + []string{"https://graph.microsoft.com/.default"}, + ) + if err != nil { + return nil, clues.Wrap(err, "creating azure authentication") + } + + httpClient := HTTPClient(opts...) + + return msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( + auth, + nil, nil, + httpClient) +} + +// HTTPClient creates the httpClient with middlewares and timeout configured +// +// Re-use of http clients is critical, or else we leak OS resources +// and consume relatively unbound socket connections. It is important +// to centralize this client to be passed downstream where api calls +// can utilize it on a per-download basis. +func HTTPClient(opts ...option) *http.Client { + clientOptions := msgraphsdkgo.GetDefaultClientOptions() + clientconfig := (&clientConfig{}).populate(opts...) + noOfRetries, minRetryDelay := clientconfig.applyMiddlewareConfig() + middlewares := GetKiotaMiddlewares(&clientOptions, noOfRetries, minRetryDelay) + httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...) + httpClient.Timeout = defaultHTTPClientTimeout + + clientconfig.apply(httpClient) + + return httpClient +} + +// --------------------------------------------------------------------------- +// HTTP Client Config // --------------------------------------------------------------------------- type clientConfig struct { @@ -167,53 +222,9 @@ func MinimumBackoff(dur time.Duration) option { } } -// CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library -// with Azure identity package. An adapter object is a necessary to component -// to create *msgraphsdk.GraphServiceClient -func CreateAdapter( - tenant, client, secret string, - opts ...option, -) (*msgraphsdkgo.GraphRequestAdapter, error) { - // Client Provider: Uses Secret for access to tenant-level data - cred, err := azidentity.NewClientSecretCredential(tenant, client, secret, nil) - if err != nil { - return nil, clues.Wrap(err, "creating m365 client identity") - } - - auth, err := kauth.NewAzureIdentityAuthenticationProviderWithScopes( - cred, - []string{"https://graph.microsoft.com/.default"}, - ) - if err != nil { - return nil, clues.Wrap(err, "creating azure authentication") - } - - httpClient := HTTPClient(opts...) - - return msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( - auth, - nil, nil, - httpClient) -} - -// HTTPClient creates the httpClient with middlewares and timeout configured -// -// Re-use of http clients is critical, or else we leak OS resources -// and consume relatively unbound socket connections. It is important -// to centralize this client to be passed downstream where api calls -// can utilize it on a per-download basis. -func HTTPClient(opts ...option) *http.Client { - clientOptions := msgraphsdkgo.GetDefaultClientOptions() - clientconfig := (&clientConfig{}).populate(opts...) - noOfRetries, minRetryDelay := clientconfig.applyMiddlewareConfig() - middlewares := GetKiotaMiddlewares(&clientOptions, noOfRetries, minRetryDelay) - httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...) - httpClient.Timeout = defaultHTTPClientTimeout - - clientconfig.apply(httpClient) - - return httpClient -} +// --------------------------------------------------------------------------- +// Middleware Control +// --------------------------------------------------------------------------- // GetDefaultMiddlewares creates a new default set of middlewares for the Kiota request adapter func GetMiddlewares(maxRetry int, delay time.Duration) []khttp.Middleware { @@ -252,261 +263,3 @@ func GetKiotaMiddlewares( return resultMiddlewares } - -// --------------------------------------------------------------------------- -// Interfaces -// --------------------------------------------------------------------------- - -type Servicer interface { - // Client() returns msgraph Service client that can be used to process and execute - // the majority of the queries to the M365 Backstore - Client() *msgraphsdkgo.GraphServiceClient - // Adapter() returns GraphRequest adapter used to process large requests, create batches - // and page iterators - Adapter() *msgraphsdkgo.GraphRequestAdapter -} - -// --------------------------------------------------------------------------- -// Client Middleware -// --------------------------------------------------------------------------- - -// LoggingMiddleware can be used to log the http request sent by the graph client -type LoggingMiddleware struct{} - -// well-known path names used by graph api calls -// used to un-hide path elements in a pii.SafeURL -var SafeURLPathParams = pii.MapWithPlurals( - //nolint:misspell - "alltime", - "analytics", - "archive", - "beta", - "calendargroup", - "calendar", - "calendarview", - "channel", - "childfolder", - "children", - "clone", - "column", - "contactfolder", - "contact", - "contenttype", - "delta", - "drive", - "event", - "group", - "inbox", - "instance", - "invitation", - "item", - "joinedteam", - "label", - "list", - "mailfolder", - "member", - "message", - "notification", - "page", - "primarychannel", - "root", - "security", - "site", - "subscription", - "team", - "unarchive", - "user", - "v1.0") - -// well-known safe query parameters used by graph api calls -// -// used to un-hide query params in a pii.SafeURL -var SafeURLQueryParams = map[string]struct{}{ - "deltatoken": {}, - "startdatetime": {}, - "enddatetime": {}, - "$count": {}, - "$expand": {}, - "$filter": {}, - "$select": {}, - "$top": {}, -} - -func LoggableURL(url string) pii.SafeURL { - return pii.SafeURL{ - URL: url, - SafePathElems: SafeURLPathParams, - SafeQueryKeys: SafeURLQueryParams, - } -} - -func (handler *LoggingMiddleware) Intercept( - pipeline khttp.Pipeline, - middlewareIndex int, - req *http.Request, -) (*http.Response, error) { - ctx := clues.Add( - req.Context(), - "method", req.Method, - "url", LoggableURL(req.URL.String()), - "request_len", req.ContentLength) - - // call the next middleware - resp, err := pipeline.Next(req, middlewareIndex) - - if strings.Contains(req.URL.String(), "users//") { - logger.Ctx(ctx).Error("malformed request url: missing resource") - } - - if resp == nil { - return resp, err - } - - ctx = clues.Add(ctx, "status", resp.Status, "statusCode", resp.StatusCode) - log := logger.Ctx(ctx) - - // Return immediately if the response is good (2xx). - // If api logging is toggled, log a body-less dump of the request/resp. - if (resp.StatusCode / 100) == 2 { - if logger.DebugAPI || os.Getenv(log2xxGraphRequestsEnvKey) != "" { - log.Debugw("2xx graph api resp", "response", getRespDump(ctx, resp, os.Getenv(log2xxGraphResponseEnvKey) != "")) - } - - return resp, err - } - - // Log errors according to api debugging configurations. - // When debugging is toggled, every non-2xx is recorded with a response dump. - // Otherwise, throttling cases and other non-2xx responses are logged - // with a slimmer reference for telemetry/supportability purposes. - if logger.DebugAPI || os.Getenv(logGraphRequestsEnvKey) != "" { - log.Errorw("non-2xx graph api response", "response", getRespDump(ctx, resp, true)) - return resp, err - } - - msg := fmt.Sprintf("graph api error: %s", resp.Status) - - // special case for supportability: log all throttling cases. - if resp.StatusCode == http.StatusTooManyRequests { - log = log.With( - "limit", resp.Header.Get(rateLimitHeader), - "remaining", resp.Header.Get(rateRemainingHeader), - "reset", resp.Header.Get(rateResetHeader), - "retry-after", resp.Header.Get(retryAfterHeader)) - } else if resp.StatusCode/100 == 4 || resp.StatusCode == http.StatusServiceUnavailable { - log = log.With("response", getRespDump(ctx, resp, true)) - } - - log.Info(msg) - - return resp, err -} - -func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string { - respDump, err := httputil.DumpResponse(resp, getBody) - if err != nil { - logger.CtxErr(ctx, err).Error("dumping http response") - } - - return string(respDump) -} - -// Intercept implements the interface and evaluates whether to retry a failed request. -func (middleware RetryHandler) Intercept( - pipeline khttp.Pipeline, - middlewareIndex int, - req *http.Request, -) (*http.Response, error) { - ctx := req.Context() - - response, err := pipeline.Next(req, middlewareIndex) - if err != nil && !IsErrTimeout(err) { - return response, Stack(ctx, err) - } - - exponentialBackOff := backoff.NewExponentialBackOff() - exponentialBackOff.InitialInterval = middleware.Delay - exponentialBackOff.Reset() - - response, err = middleware.retryRequest( - ctx, - pipeline, - middlewareIndex, - req, - response, - 0, - 0, - exponentialBackOff, - err) - if err != nil { - return nil, Stack(ctx, err) - } - - return response, nil -} - -// We're trying to keep calls below the 10k-per-10-minute threshold. -// 15 tokens every second nets 900 per minute. That's 9000 every 10 minutes, -// which is a bit below the mark. -// But suppose we have a minute-long dry spell followed by a 10 minute tsunami. -// We'll have built up 900 tokens in reserve, so the first 900 calls go through -// immediately. Over the next 10 minutes, we'll partition out the other calls -// at a rate of 900-per-minute, ending at a total of 9900. Theoretically, if -// the volume keeps up after that, we'll always stay between 9000 and 9900 out -// of 10k. -const ( - perSecond = 15 - maxCap = 900 -) - -// Single, global rate limiter at this time. Refinements for method (creates, -// versus reads) or service can come later. -var limiter = rate.NewLimiter(perSecond, maxCap) - -// QueueRequest will allow the request to occur immediately if we're under the -// 1k-calls-per-minute rate. Otherwise, the call will wait in a queue until -// the next token set is available. -func QueueRequest(ctx context.Context) { - if err := limiter.Wait(ctx); err != nil { - logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") - } -} - -// ThrottleControlMiddleware is used to ensure we don't overstep 10k-per-10-min -// request limits. -type ThrottleControlMiddleware struct{} - -func (handler *ThrottleControlMiddleware) Intercept( - pipeline khttp.Pipeline, - middlewareIndex int, - req *http.Request, -) (*http.Response, error) { - QueueRequest(req.Context()) - return pipeline.Next(req, middlewareIndex) -} - -// MetricsMiddleware aggregates per-request metrics on the events bus -type MetricsMiddleware struct{} - -func (handler *MetricsMiddleware) Intercept( - pipeline khttp.Pipeline, - middlewareIndex int, - req *http.Request, -) (*http.Response, error) { - var ( - start = time.Now() - resp, err = pipeline.Next(req, middlewareIndex) - status = "nil-resp" - ) - - if resp != nil { - status = resp.Status - } - - events.Inc(events.APICall) - events.Inc(events.APICall, status) - events.Since(start, events.APICall) - events.Since(start, events.APICall, status) - - return resp, err -} diff --git a/src/internal/connector/graph/shared.go b/src/internal/connector/graph/shared.go deleted file mode 100644 index 208f50420..000000000 --- a/src/internal/connector/graph/shared.go +++ /dev/null @@ -1,19 +0,0 @@ -package graph - -// Contains helper methods that are common across multiple -// Microsoft Graph Applications. - -import ( - "github.com/microsoftgraph/msgraph-sdk-go/models" - - "github.com/alcionai/corso/src/internal/common/ptr" -) - -func UnwrapEmailAddress(contact models.Recipientable) string { - var empty string - if contact == nil || contact.GetEmailAddress() == nil { - return empty - } - - return ptr.Val(contact.GetEmailAddress().GetAddress()) -}