moving code around, no logic changes (#3025)
#### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change - [x] 🧹 Tech Debt/Cleanup #### Issue(s) * #2024 #### Test Plan - [x] ⚡ Unit test
This commit is contained in:
parent
d393bb09a7
commit
da8ac5cdbc
@ -360,7 +360,7 @@ func (c Mail) Serialize(
|
||||
|
||||
func MailInfo(msg models.Messageable) *details.ExchangeInfo {
|
||||
var (
|
||||
sender = graph.UnwrapEmailAddress(msg.GetSender())
|
||||
sender = UnwrapEmailAddress(msg.GetSender())
|
||||
subject = ptr.Val(msg.GetSubject())
|
||||
received = ptr.Val(msg.GetReceivedDateTime())
|
||||
created = ptr.Val(msg.GetCreatedDateTime())
|
||||
@ -370,7 +370,7 @@ func MailInfo(msg models.Messageable) *details.ExchangeInfo {
|
||||
if msg.GetToRecipients() != nil {
|
||||
ppl := msg.GetToRecipients()
|
||||
for _, entry := range ppl {
|
||||
temp := graph.UnwrapEmailAddress(entry)
|
||||
temp := UnwrapEmailAddress(entry)
|
||||
if len(temp) > 0 {
|
||||
recipients = append(recipients, temp)
|
||||
}
|
||||
@ -387,3 +387,12 @@ func MailInfo(msg models.Messageable) *details.ExchangeInfo {
|
||||
Modified: ptr.OrNow(msg.GetLastModifiedDateTime()),
|
||||
}
|
||||
}
|
||||
|
||||
func UnwrapEmailAddress(contact models.Recipientable) string {
|
||||
var empty string
|
||||
if contact == nil || contact.GetEmailAddress() == nil {
|
||||
return empty
|
||||
}
|
||||
|
||||
return ptr.Val(contact.GetEmailAddress().GetAddress())
|
||||
}
|
||||
|
||||
373
src/internal/connector/graph/middleware.go
Normal file
373
src/internal/connector/graph/middleware.go
Normal file
@ -0,0 +1,373 @@
|
||||
package graph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
khttp "github.com/microsoft/kiota-http-go"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/pii"
|
||||
"github.com/alcionai/corso/src/internal/events"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Logging
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// LoggingMiddleware can be used to log the http request sent by the graph client
|
||||
type LoggingMiddleware struct{}
|
||||
|
||||
// well-known path names used by graph api calls
|
||||
// used to un-hide path elements in a pii.SafeURL
|
||||
var SafeURLPathParams = pii.MapWithPlurals(
|
||||
//nolint:misspell
|
||||
"alltime",
|
||||
"analytics",
|
||||
"archive",
|
||||
"beta",
|
||||
"calendargroup",
|
||||
"calendar",
|
||||
"calendarview",
|
||||
"channel",
|
||||
"childfolder",
|
||||
"children",
|
||||
"clone",
|
||||
"column",
|
||||
"contactfolder",
|
||||
"contact",
|
||||
"contenttype",
|
||||
"delta",
|
||||
"drive",
|
||||
"event",
|
||||
"group",
|
||||
"inbox",
|
||||
"instance",
|
||||
"invitation",
|
||||
"item",
|
||||
"joinedteam",
|
||||
"label",
|
||||
"list",
|
||||
"mailfolder",
|
||||
"member",
|
||||
"message",
|
||||
"notification",
|
||||
"page",
|
||||
"primarychannel",
|
||||
"root",
|
||||
"security",
|
||||
"site",
|
||||
"subscription",
|
||||
"team",
|
||||
"unarchive",
|
||||
"user",
|
||||
"v1.0")
|
||||
|
||||
// well-known safe query parameters used by graph api calls
|
||||
//
|
||||
// used to un-hide query params in a pii.SafeURL
|
||||
var SafeURLQueryParams = map[string]struct{}{
|
||||
"deltatoken": {},
|
||||
"startdatetime": {},
|
||||
"enddatetime": {},
|
||||
"$count": {},
|
||||
"$expand": {},
|
||||
"$filter": {},
|
||||
"$select": {},
|
||||
"$top": {},
|
||||
}
|
||||
|
||||
func LoggableURL(url string) pii.SafeURL {
|
||||
return pii.SafeURL{
|
||||
URL: url,
|
||||
SafePathElems: SafeURLPathParams,
|
||||
SafeQueryKeys: SafeURLQueryParams,
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *LoggingMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
ctx := clues.Add(
|
||||
req.Context(),
|
||||
"method", req.Method,
|
||||
"url", LoggableURL(req.URL.String()),
|
||||
"request_len", req.ContentLength)
|
||||
|
||||
// call the next middleware
|
||||
resp, err := pipeline.Next(req, middlewareIndex)
|
||||
|
||||
if strings.Contains(req.URL.String(), "users//") {
|
||||
logger.Ctx(ctx).Error("malformed request url: missing resource")
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
ctx = clues.Add(ctx, "status", resp.Status, "statusCode", resp.StatusCode)
|
||||
log := logger.Ctx(ctx)
|
||||
|
||||
// Return immediately if the response is good (2xx).
|
||||
// If api logging is toggled, log a body-less dump of the request/resp.
|
||||
if (resp.StatusCode / 100) == 2 {
|
||||
if logger.DebugAPI || os.Getenv(log2xxGraphRequestsEnvKey) != "" {
|
||||
log.Debugw("2xx graph api resp", "response", getRespDump(ctx, resp, os.Getenv(log2xxGraphResponseEnvKey) != ""))
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Log errors according to api debugging configurations.
|
||||
// When debugging is toggled, every non-2xx is recorded with a response dump.
|
||||
// Otherwise, throttling cases and other non-2xx responses are logged
|
||||
// with a slimmer reference for telemetry/supportability purposes.
|
||||
if logger.DebugAPI || os.Getenv(logGraphRequestsEnvKey) != "" {
|
||||
log.Errorw("non-2xx graph api response", "response", getRespDump(ctx, resp, true))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("graph api error: %s", resp.Status)
|
||||
|
||||
// special case for supportability: log all throttling cases.
|
||||
if resp.StatusCode == http.StatusTooManyRequests {
|
||||
log = log.With(
|
||||
"limit", resp.Header.Get(rateLimitHeader),
|
||||
"remaining", resp.Header.Get(rateRemainingHeader),
|
||||
"reset", resp.Header.Get(rateResetHeader),
|
||||
"retry-after", resp.Header.Get(retryAfterHeader))
|
||||
} else if resp.StatusCode/100 == 4 || resp.StatusCode == http.StatusServiceUnavailable {
|
||||
log = log.With("response", getRespDump(ctx, resp, true))
|
||||
}
|
||||
|
||||
log.Info(msg)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string {
|
||||
respDump, err := httputil.DumpResponse(resp, getBody)
|
||||
if err != nil {
|
||||
logger.CtxErr(ctx, err).Error("dumping http response")
|
||||
}
|
||||
|
||||
return string(respDump)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Retry & Backoff
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// RetryHandler handles transient HTTP responses and retries the request given the retry options
|
||||
type RetryHandler struct {
|
||||
// The maximum number of times a request can be retried
|
||||
MaxRetries int
|
||||
// The delay in seconds between retries
|
||||
Delay time.Duration
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) retryRequest(
|
||||
ctx context.Context,
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
resp *http.Response,
|
||||
executionCount int,
|
||||
cumulativeDelay time.Duration,
|
||||
exponentialBackoff *backoff.ExponentialBackOff,
|
||||
respErr error,
|
||||
) (*http.Response, error) {
|
||||
if (respErr != nil || middleware.isRetriableErrorCode(req, resp.StatusCode)) &&
|
||||
middleware.isRetriableRequest(req) &&
|
||||
executionCount < middleware.MaxRetries {
|
||||
executionCount++
|
||||
|
||||
delay := middleware.getRetryDelay(req, resp, exponentialBackoff)
|
||||
|
||||
cumulativeDelay += delay
|
||||
|
||||
req.Header.Set(retryAttemptHeader, strconv.Itoa(executionCount))
|
||||
|
||||
timer := time.NewTimer(delay)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Don't retry if the context is marked as done, it will just error out
|
||||
// when we attempt to send the retry anyway.
|
||||
return resp, ctx.Err()
|
||||
|
||||
// Will exit switch-block so the remainder of the code doesn't need to be
|
||||
// indented.
|
||||
case <-timer.C:
|
||||
}
|
||||
|
||||
response, err := pipeline.Next(req, middlewareIndex)
|
||||
if err != nil && !IsErrTimeout(err) {
|
||||
return response, Stack(ctx, err).With("retry_count", executionCount)
|
||||
}
|
||||
|
||||
return middleware.retryRequest(ctx,
|
||||
pipeline,
|
||||
middlewareIndex,
|
||||
req,
|
||||
response,
|
||||
executionCount,
|
||||
cumulativeDelay,
|
||||
exponentialBackoff,
|
||||
err)
|
||||
}
|
||||
|
||||
if respErr != nil {
|
||||
return nil, Stack(ctx, respErr).With("retry_count", executionCount)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) isRetriableErrorCode(req *http.Request, code int) bool {
|
||||
return code == http.StatusInternalServerError || code == http.StatusServiceUnavailable
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) isRetriableRequest(req *http.Request) bool {
|
||||
isBodiedMethod := req.Method == "POST" || req.Method == "PUT" || req.Method == "PATCH"
|
||||
if isBodiedMethod && req.Body != nil {
|
||||
return req.ContentLength != -1
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) getRetryDelay(
|
||||
req *http.Request,
|
||||
resp *http.Response,
|
||||
exponentialBackoff *backoff.ExponentialBackOff,
|
||||
) time.Duration {
|
||||
var retryAfter string
|
||||
if resp != nil {
|
||||
retryAfter = resp.Header.Get(retryAfterHeader)
|
||||
}
|
||||
|
||||
if retryAfter != "" {
|
||||
retryAfterDelay, err := strconv.ParseFloat(retryAfter, 64)
|
||||
if err == nil {
|
||||
return time.Duration(retryAfterDelay) * time.Second
|
||||
}
|
||||
} // TODO parse the header if it's a date
|
||||
|
||||
return exponentialBackoff.NextBackOff()
|
||||
}
|
||||
|
||||
// Intercept implements the interface and evaluates whether to retry a failed request.
|
||||
func (middleware RetryHandler) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
ctx := req.Context()
|
||||
|
||||
response, err := pipeline.Next(req, middlewareIndex)
|
||||
if err != nil && !IsErrTimeout(err) {
|
||||
return response, Stack(ctx, err)
|
||||
}
|
||||
|
||||
exponentialBackOff := backoff.NewExponentialBackOff()
|
||||
exponentialBackOff.InitialInterval = middleware.Delay
|
||||
exponentialBackOff.Reset()
|
||||
|
||||
response, err = middleware.retryRequest(
|
||||
ctx,
|
||||
pipeline,
|
||||
middlewareIndex,
|
||||
req,
|
||||
response,
|
||||
0,
|
||||
0,
|
||||
exponentialBackOff,
|
||||
err)
|
||||
if err != nil {
|
||||
return nil, Stack(ctx, err)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// We're trying to keep calls below the 10k-per-10-minute threshold.
|
||||
// 15 tokens every second nets 900 per minute. That's 9000 every 10 minutes,
|
||||
// which is a bit below the mark.
|
||||
// But suppose we have a minute-long dry spell followed by a 10 minute tsunami.
|
||||
// We'll have built up 900 tokens in reserve, so the first 900 calls go through
|
||||
// immediately. Over the next 10 minutes, we'll partition out the other calls
|
||||
// at a rate of 900-per-minute, ending at a total of 9900. Theoretically, if
|
||||
// the volume keeps up after that, we'll always stay between 9000 and 9900 out
|
||||
// of 10k.
|
||||
const (
|
||||
perSecond = 15
|
||||
maxCap = 900
|
||||
)
|
||||
|
||||
// Single, global rate limiter at this time. Refinements for method (creates,
|
||||
// versus reads) or service can come later.
|
||||
var limiter = rate.NewLimiter(perSecond, maxCap)
|
||||
|
||||
// QueueRequest will allow the request to occur immediately if we're under the
|
||||
// 1k-calls-per-minute rate. Otherwise, the call will wait in a queue until
|
||||
// the next token set is available.
|
||||
func QueueRequest(ctx context.Context) {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Rate Limiting
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// ThrottleControlMiddleware is used to ensure we don't overstep 10k-per-10-min
|
||||
// request limits.
|
||||
type ThrottleControlMiddleware struct{}
|
||||
|
||||
func (handler *ThrottleControlMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
QueueRequest(req.Context())
|
||||
return pipeline.Next(req, middlewareIndex)
|
||||
}
|
||||
|
||||
// MetricsMiddleware aggregates per-request metrics on the events bus
|
||||
type MetricsMiddleware struct{}
|
||||
|
||||
func (handler *MetricsMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
var (
|
||||
start = time.Now()
|
||||
resp, err = pipeline.Next(req, middlewareIndex)
|
||||
status = "nil-resp"
|
||||
)
|
||||
|
||||
if resp != nil {
|
||||
status = resp.Status
|
||||
}
|
||||
|
||||
events.Inc(events.APICall)
|
||||
events.Inc(events.APICall, status)
|
||||
events.Since(start, events.APICall)
|
||||
events.Since(start, events.APICall, status)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
@ -1,114 +0,0 @@
|
||||
package graph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
khttp "github.com/microsoft/kiota-http-go"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Client Middleware
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// RetryHandler handles transient HTTP responses and retries the request given the retry options
|
||||
type RetryHandler struct {
|
||||
// The maximum number of times a request can be retried
|
||||
MaxRetries int
|
||||
// The delay in seconds between retries
|
||||
Delay time.Duration
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) retryRequest(
|
||||
ctx context.Context,
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
resp *http.Response,
|
||||
executionCount int,
|
||||
cumulativeDelay time.Duration,
|
||||
exponentialBackoff *backoff.ExponentialBackOff,
|
||||
respErr error,
|
||||
) (*http.Response, error) {
|
||||
if (respErr != nil || middleware.isRetriableErrorCode(req, resp.StatusCode)) &&
|
||||
middleware.isRetriableRequest(req) &&
|
||||
executionCount < middleware.MaxRetries {
|
||||
executionCount++
|
||||
|
||||
delay := middleware.getRetryDelay(req, resp, exponentialBackoff)
|
||||
|
||||
cumulativeDelay += delay
|
||||
|
||||
timer := time.NewTimer(delay)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Don't retry if the context is marked as done, it will just error out
|
||||
// when we attempt to send the retry anyway.
|
||||
return resp, ctx.Err()
|
||||
|
||||
// Will exit switch-block so the remainder of the code doesn't need to be
|
||||
// indented.
|
||||
case <-timer.C:
|
||||
}
|
||||
|
||||
req.Header.Set(retryAttemptHeader, strconv.Itoa(executionCount))
|
||||
|
||||
response, err := pipeline.Next(req, middlewareIndex)
|
||||
if err != nil && !IsErrTimeout(err) {
|
||||
return response, Stack(ctx, err).With("retry_count", executionCount)
|
||||
}
|
||||
|
||||
return middleware.retryRequest(ctx,
|
||||
pipeline,
|
||||
middlewareIndex,
|
||||
req,
|
||||
response,
|
||||
executionCount,
|
||||
cumulativeDelay,
|
||||
exponentialBackoff,
|
||||
err)
|
||||
}
|
||||
|
||||
if respErr != nil {
|
||||
return nil, Stack(ctx, respErr).With("retry_count", executionCount)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) isRetriableErrorCode(req *http.Request, code int) bool {
|
||||
return code == http.StatusInternalServerError || code == http.StatusServiceUnavailable
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) isRetriableRequest(req *http.Request) bool {
|
||||
isBodiedMethod := req.Method == "POST" || req.Method == "PUT" || req.Method == "PATCH"
|
||||
if isBodiedMethod && req.Body != nil {
|
||||
return req.ContentLength != -1
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) getRetryDelay(
|
||||
req *http.Request,
|
||||
resp *http.Response,
|
||||
exponentialBackoff *backoff.ExponentialBackOff,
|
||||
) time.Duration {
|
||||
var retryAfter string
|
||||
if resp != nil {
|
||||
retryAfter = resp.Header.Get(retryAfterHeader)
|
||||
}
|
||||
|
||||
if retryAfter != "" {
|
||||
retryAfterDelay, err := strconv.ParseFloat(retryAfter, 64)
|
||||
if err == nil {
|
||||
return time.Duration(retryAfterDelay) * time.Second
|
||||
}
|
||||
} // TODO parse the header if it's a date
|
||||
|
||||
return exponentialBackoff.NextBackOff()
|
||||
}
|
||||
@ -1,28 +1,18 @@
|
||||
package graph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
|
||||
"github.com/alcionai/clues"
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
"github.com/microsoft/kiota-abstractions-go/serialization"
|
||||
kauth "github.com/microsoft/kiota-authentication-azure-go"
|
||||
khttp "github.com/microsoft/kiota-http-go"
|
||||
msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go"
|
||||
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/pii"
|
||||
"github.com/alcionai/corso/src/internal/events"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
@ -52,6 +42,19 @@ type QueryParams struct {
|
||||
Credentials account.M365Config
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Interfaces
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type Servicer interface {
|
||||
// Client() returns msgraph Service client that can be used to process and execute
|
||||
// the majority of the queries to the M365 Backstore
|
||||
Client() *msgraphsdkgo.GraphServiceClient
|
||||
// Adapter() returns GraphRequest adapter used to process large requests, create batches
|
||||
// and page iterators
|
||||
Adapter() *msgraphsdkgo.GraphRequestAdapter
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Service Handler
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -95,7 +98,59 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) {
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// Adapter
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library
|
||||
// with Azure identity package. An adapter object is a necessary to component
|
||||
// to create *msgraphsdk.GraphServiceClient
|
||||
func CreateAdapter(
|
||||
tenant, client, secret string,
|
||||
opts ...option,
|
||||
) (*msgraphsdkgo.GraphRequestAdapter, error) {
|
||||
// Client Provider: Uses Secret for access to tenant-level data
|
||||
cred, err := azidentity.NewClientSecretCredential(tenant, client, secret, nil)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "creating m365 client identity")
|
||||
}
|
||||
|
||||
auth, err := kauth.NewAzureIdentityAuthenticationProviderWithScopes(
|
||||
cred,
|
||||
[]string{"https://graph.microsoft.com/.default"},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "creating azure authentication")
|
||||
}
|
||||
|
||||
httpClient := HTTPClient(opts...)
|
||||
|
||||
return msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient(
|
||||
auth,
|
||||
nil, nil,
|
||||
httpClient)
|
||||
}
|
||||
|
||||
// HTTPClient creates the httpClient with middlewares and timeout configured
|
||||
//
|
||||
// Re-use of http clients is critical, or else we leak OS resources
|
||||
// and consume relatively unbound socket connections. It is important
|
||||
// to centralize this client to be passed downstream where api calls
|
||||
// can utilize it on a per-download basis.
|
||||
func HTTPClient(opts ...option) *http.Client {
|
||||
clientOptions := msgraphsdkgo.GetDefaultClientOptions()
|
||||
clientconfig := (&clientConfig{}).populate(opts...)
|
||||
noOfRetries, minRetryDelay := clientconfig.applyMiddlewareConfig()
|
||||
middlewares := GetKiotaMiddlewares(&clientOptions, noOfRetries, minRetryDelay)
|
||||
httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...)
|
||||
httpClient.Timeout = defaultHTTPClientTimeout
|
||||
|
||||
clientconfig.apply(httpClient)
|
||||
|
||||
return httpClient
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// HTTP Client Config
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type clientConfig struct {
|
||||
@ -167,53 +222,9 @@ func MinimumBackoff(dur time.Duration) option {
|
||||
}
|
||||
}
|
||||
|
||||
// CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library
|
||||
// with Azure identity package. An adapter object is a necessary to component
|
||||
// to create *msgraphsdk.GraphServiceClient
|
||||
func CreateAdapter(
|
||||
tenant, client, secret string,
|
||||
opts ...option,
|
||||
) (*msgraphsdkgo.GraphRequestAdapter, error) {
|
||||
// Client Provider: Uses Secret for access to tenant-level data
|
||||
cred, err := azidentity.NewClientSecretCredential(tenant, client, secret, nil)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "creating m365 client identity")
|
||||
}
|
||||
|
||||
auth, err := kauth.NewAzureIdentityAuthenticationProviderWithScopes(
|
||||
cred,
|
||||
[]string{"https://graph.microsoft.com/.default"},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "creating azure authentication")
|
||||
}
|
||||
|
||||
httpClient := HTTPClient(opts...)
|
||||
|
||||
return msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient(
|
||||
auth,
|
||||
nil, nil,
|
||||
httpClient)
|
||||
}
|
||||
|
||||
// HTTPClient creates the httpClient with middlewares and timeout configured
|
||||
//
|
||||
// Re-use of http clients is critical, or else we leak OS resources
|
||||
// and consume relatively unbound socket connections. It is important
|
||||
// to centralize this client to be passed downstream where api calls
|
||||
// can utilize it on a per-download basis.
|
||||
func HTTPClient(opts ...option) *http.Client {
|
||||
clientOptions := msgraphsdkgo.GetDefaultClientOptions()
|
||||
clientconfig := (&clientConfig{}).populate(opts...)
|
||||
noOfRetries, minRetryDelay := clientconfig.applyMiddlewareConfig()
|
||||
middlewares := GetKiotaMiddlewares(&clientOptions, noOfRetries, minRetryDelay)
|
||||
httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...)
|
||||
httpClient.Timeout = defaultHTTPClientTimeout
|
||||
|
||||
clientconfig.apply(httpClient)
|
||||
|
||||
return httpClient
|
||||
}
|
||||
// ---------------------------------------------------------------------------
|
||||
// Middleware Control
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// GetDefaultMiddlewares creates a new default set of middlewares for the Kiota request adapter
|
||||
func GetMiddlewares(maxRetry int, delay time.Duration) []khttp.Middleware {
|
||||
@ -252,261 +263,3 @@ func GetKiotaMiddlewares(
|
||||
|
||||
return resultMiddlewares
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Interfaces
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type Servicer interface {
|
||||
// Client() returns msgraph Service client that can be used to process and execute
|
||||
// the majority of the queries to the M365 Backstore
|
||||
Client() *msgraphsdkgo.GraphServiceClient
|
||||
// Adapter() returns GraphRequest adapter used to process large requests, create batches
|
||||
// and page iterators
|
||||
Adapter() *msgraphsdkgo.GraphRequestAdapter
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Client Middleware
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// LoggingMiddleware can be used to log the http request sent by the graph client
|
||||
type LoggingMiddleware struct{}
|
||||
|
||||
// well-known path names used by graph api calls
|
||||
// used to un-hide path elements in a pii.SafeURL
|
||||
var SafeURLPathParams = pii.MapWithPlurals(
|
||||
//nolint:misspell
|
||||
"alltime",
|
||||
"analytics",
|
||||
"archive",
|
||||
"beta",
|
||||
"calendargroup",
|
||||
"calendar",
|
||||
"calendarview",
|
||||
"channel",
|
||||
"childfolder",
|
||||
"children",
|
||||
"clone",
|
||||
"column",
|
||||
"contactfolder",
|
||||
"contact",
|
||||
"contenttype",
|
||||
"delta",
|
||||
"drive",
|
||||
"event",
|
||||
"group",
|
||||
"inbox",
|
||||
"instance",
|
||||
"invitation",
|
||||
"item",
|
||||
"joinedteam",
|
||||
"label",
|
||||
"list",
|
||||
"mailfolder",
|
||||
"member",
|
||||
"message",
|
||||
"notification",
|
||||
"page",
|
||||
"primarychannel",
|
||||
"root",
|
||||
"security",
|
||||
"site",
|
||||
"subscription",
|
||||
"team",
|
||||
"unarchive",
|
||||
"user",
|
||||
"v1.0")
|
||||
|
||||
// well-known safe query parameters used by graph api calls
|
||||
//
|
||||
// used to un-hide query params in a pii.SafeURL
|
||||
var SafeURLQueryParams = map[string]struct{}{
|
||||
"deltatoken": {},
|
||||
"startdatetime": {},
|
||||
"enddatetime": {},
|
||||
"$count": {},
|
||||
"$expand": {},
|
||||
"$filter": {},
|
||||
"$select": {},
|
||||
"$top": {},
|
||||
}
|
||||
|
||||
func LoggableURL(url string) pii.SafeURL {
|
||||
return pii.SafeURL{
|
||||
URL: url,
|
||||
SafePathElems: SafeURLPathParams,
|
||||
SafeQueryKeys: SafeURLQueryParams,
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *LoggingMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
ctx := clues.Add(
|
||||
req.Context(),
|
||||
"method", req.Method,
|
||||
"url", LoggableURL(req.URL.String()),
|
||||
"request_len", req.ContentLength)
|
||||
|
||||
// call the next middleware
|
||||
resp, err := pipeline.Next(req, middlewareIndex)
|
||||
|
||||
if strings.Contains(req.URL.String(), "users//") {
|
||||
logger.Ctx(ctx).Error("malformed request url: missing resource")
|
||||
}
|
||||
|
||||
if resp == nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
ctx = clues.Add(ctx, "status", resp.Status, "statusCode", resp.StatusCode)
|
||||
log := logger.Ctx(ctx)
|
||||
|
||||
// Return immediately if the response is good (2xx).
|
||||
// If api logging is toggled, log a body-less dump of the request/resp.
|
||||
if (resp.StatusCode / 100) == 2 {
|
||||
if logger.DebugAPI || os.Getenv(log2xxGraphRequestsEnvKey) != "" {
|
||||
log.Debugw("2xx graph api resp", "response", getRespDump(ctx, resp, os.Getenv(log2xxGraphResponseEnvKey) != ""))
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Log errors according to api debugging configurations.
|
||||
// When debugging is toggled, every non-2xx is recorded with a response dump.
|
||||
// Otherwise, throttling cases and other non-2xx responses are logged
|
||||
// with a slimmer reference for telemetry/supportability purposes.
|
||||
if logger.DebugAPI || os.Getenv(logGraphRequestsEnvKey) != "" {
|
||||
log.Errorw("non-2xx graph api response", "response", getRespDump(ctx, resp, true))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("graph api error: %s", resp.Status)
|
||||
|
||||
// special case for supportability: log all throttling cases.
|
||||
if resp.StatusCode == http.StatusTooManyRequests {
|
||||
log = log.With(
|
||||
"limit", resp.Header.Get(rateLimitHeader),
|
||||
"remaining", resp.Header.Get(rateRemainingHeader),
|
||||
"reset", resp.Header.Get(rateResetHeader),
|
||||
"retry-after", resp.Header.Get(retryAfterHeader))
|
||||
} else if resp.StatusCode/100 == 4 || resp.StatusCode == http.StatusServiceUnavailable {
|
||||
log = log.With("response", getRespDump(ctx, resp, true))
|
||||
}
|
||||
|
||||
log.Info(msg)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string {
|
||||
respDump, err := httputil.DumpResponse(resp, getBody)
|
||||
if err != nil {
|
||||
logger.CtxErr(ctx, err).Error("dumping http response")
|
||||
}
|
||||
|
||||
return string(respDump)
|
||||
}
|
||||
|
||||
// Intercept implements the interface and evaluates whether to retry a failed request.
|
||||
func (middleware RetryHandler) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
ctx := req.Context()
|
||||
|
||||
response, err := pipeline.Next(req, middlewareIndex)
|
||||
if err != nil && !IsErrTimeout(err) {
|
||||
return response, Stack(ctx, err)
|
||||
}
|
||||
|
||||
exponentialBackOff := backoff.NewExponentialBackOff()
|
||||
exponentialBackOff.InitialInterval = middleware.Delay
|
||||
exponentialBackOff.Reset()
|
||||
|
||||
response, err = middleware.retryRequest(
|
||||
ctx,
|
||||
pipeline,
|
||||
middlewareIndex,
|
||||
req,
|
||||
response,
|
||||
0,
|
||||
0,
|
||||
exponentialBackOff,
|
||||
err)
|
||||
if err != nil {
|
||||
return nil, Stack(ctx, err)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// We're trying to keep calls below the 10k-per-10-minute threshold.
|
||||
// 15 tokens every second nets 900 per minute. That's 9000 every 10 minutes,
|
||||
// which is a bit below the mark.
|
||||
// But suppose we have a minute-long dry spell followed by a 10 minute tsunami.
|
||||
// We'll have built up 900 tokens in reserve, so the first 900 calls go through
|
||||
// immediately. Over the next 10 minutes, we'll partition out the other calls
|
||||
// at a rate of 900-per-minute, ending at a total of 9900. Theoretically, if
|
||||
// the volume keeps up after that, we'll always stay between 9000 and 9900 out
|
||||
// of 10k.
|
||||
const (
|
||||
perSecond = 15
|
||||
maxCap = 900
|
||||
)
|
||||
|
||||
// Single, global rate limiter at this time. Refinements for method (creates,
|
||||
// versus reads) or service can come later.
|
||||
var limiter = rate.NewLimiter(perSecond, maxCap)
|
||||
|
||||
// QueueRequest will allow the request to occur immediately if we're under the
|
||||
// 1k-calls-per-minute rate. Otherwise, the call will wait in a queue until
|
||||
// the next token set is available.
|
||||
func QueueRequest(ctx context.Context) {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
|
||||
}
|
||||
}
|
||||
|
||||
// ThrottleControlMiddleware is used to ensure we don't overstep 10k-per-10-min
|
||||
// request limits.
|
||||
type ThrottleControlMiddleware struct{}
|
||||
|
||||
func (handler *ThrottleControlMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
QueueRequest(req.Context())
|
||||
return pipeline.Next(req, middlewareIndex)
|
||||
}
|
||||
|
||||
// MetricsMiddleware aggregates per-request metrics on the events bus
|
||||
type MetricsMiddleware struct{}
|
||||
|
||||
func (handler *MetricsMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
var (
|
||||
start = time.Now()
|
||||
resp, err = pipeline.Next(req, middlewareIndex)
|
||||
status = "nil-resp"
|
||||
)
|
||||
|
||||
if resp != nil {
|
||||
status = resp.Status
|
||||
}
|
||||
|
||||
events.Inc(events.APICall)
|
||||
events.Inc(events.APICall, status)
|
||||
events.Since(start, events.APICall)
|
||||
events.Since(start, events.APICall, status)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@ -1,19 +0,0 @@
|
||||
package graph
|
||||
|
||||
// Contains helper methods that are common across multiple
|
||||
// Microsoft Graph Applications.
|
||||
|
||||
import (
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
)
|
||||
|
||||
func UnwrapEmailAddress(contact models.Recipientable) string {
|
||||
var empty string
|
||||
if contact == nil || contact.GetEmailAddress() == nil {
|
||||
return empty
|
||||
}
|
||||
|
||||
return ptr.Val(contact.GetEmailAddress().GetAddress())
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user