Compare commits

...

9 Commits

Author SHA1 Message Date
Abhishek Pandey
79d7ffa18c move logging to metrics middleware 2023-12-07 18:46:06 -08:00
Abhishek Pandey
8dbda8ebda Put back req logging 2023-12-07 11:00:47 -08:00
Abhishek Pandey
21c006ebd8 only do get items 2023-12-06 22:24:18 -08:00
Abhishek Pandey
b8930d436d Remove chaos handler 2023-12-06 21:44:31 -08:00
Abhishek Pandey
eaf252a26c Dont log req 2023-12-06 21:35:33 -08:00
Abhishek Pandey
93079b0812 Always GET items 2023-12-06 19:33:01 -08:00
Abhishek Pandey
93406f81a1 Add dummy resp body 2023-12-06 18:49:07 -08:00
Abhishek Pandey
d3b902b410 Add chaos middleware 2023-12-06 18:21:15 -08:00
Abhishek Pandey
1c19b22d8c Add req logging 2023-12-06 18:21:01 -08:00
4 changed files with 109 additions and 34 deletions

View File

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/spatialcurrent/go-lazy/pkg/lazy" "github.com/spatialcurrent/go-lazy/pkg/lazy"
"github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/idname"
@ -362,36 +363,42 @@ func downloadContent(
itemID := ptr.Val(item.GetId()) itemID := ptr.Val(item.GetId())
ctx = clues.Add(ctx, "item_id", itemID) ctx = clues.Add(ctx, "item_id", itemID)
content, err := downloadItem(ctx, iaag, item) // content, err := downloadItem(ctx, iaag, item)
if err == nil { // if err == nil {
return content, nil // return content, nil
} else if !graph.IsErrUnauthorizedOrBadToken(err) { // } else if !graph.IsErrUnauthorizedOrBadToken(err) {
return nil, err // return nil, err
} // }
// Assume unauthorized requests are a sign of an expired jwt // // Assume unauthorized requests are a sign of an expired jwt
// token, and that we've overrun the available window to // // token, and that we've overrun the available window to
// download the file. Get a fresh url from the cache and attempt to // // download the file. Get a fresh url from the cache and attempt to
// download again. // // download again.
content, err = readItemContents(ctx, iaag, uc, itemID) // content, err = readItemContents(ctx, iaag, uc, itemID)
if err == nil { // if err == nil {
logger.Ctx(ctx).Debug("found item in url cache") // logger.Ctx(ctx).Debug("found item in url cache")
return content, nil // return content, nil
} // }
// Consider cache errors(including deleted items) as cache misses. // // Consider cache errors(including deleted items) as cache misses.
// Fallback to refetching the item using the graph API. // // Fallback to refetching the item using the graph API.
logger.CtxErr(ctx, err).Debug("url cache miss: refetching from API") // logger.CtxErr(ctx, err).Debug("url cache miss: refetching from API")
counter.Inc(count.URLCacheMiss) // counter.Inc(count.URLCacheMiss)
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId())) var di models.DriveItemable
if err != nil {
return nil, clues.Wrap(err, "retrieving expired item") for i := 0; i < 1000000; i++ {
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
if err != nil {
return nil, clues.Wrap(err, "retrieving expired item")
}
logger.Ctx(ctx).Debugf("iteration %d item id %s\n ", i, ptr.Val(di.GetId()))
} }
cdi := custom.ToCustomDriveItem(di) cdi := custom.ToCustomDriveItem(di)
content, err = downloadItem(ctx, iaag, cdi) content, err := downloadItem(ctx, iaag, cdi)
if err != nil { if err != nil {
return nil, clues.Wrap(err, "content download retry") return nil, clues.Wrap(err, "content download retry")
} }
@ -469,7 +476,7 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
int64(len(oc.driveItems))) int64(len(oc.driveItems)))
defer close(folderProgress) defer close(folderProgress)
semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).Item()) semaphoreCh := make(chan struct{}, 1)
defer close(semaphoreCh) defer close(semaphoreCh)
ctx = clues.Add(ctx, ctx = clues.Add(ctx,

View File

@ -42,6 +42,14 @@ func logResp(ctx context.Context, resp *http.Response) {
return return
} }
// Dump both request and response for graph 400 errors. This is a temporary
// measure to help us understand why graph is returning transient 400s.
if resp.StatusCode == http.StatusBadRequest {
log.With("response", getRespDump(ctx, resp, logBody)).
With("request", getRequestDump(ctx, resp.Request, true)).
Info("graph api bad request")
}
// Log api calls according to api debugging configurations. // Log api calls according to api debugging configurations.
switch respClass { switch respClass {
case 2: case 2:
@ -61,6 +69,15 @@ func logResp(ctx context.Context, resp *http.Response) {
} }
} }
func logReq(ctx context.Context, req *http.Request) {
var (
log = logger.Ctx(ctx)
)
log.With("request", getRequestDump(ctx, req, true)).
Info("graph api req")
}
func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string { func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string {
respDump, err := httputil.DumpResponse(resp, getBody) respDump, err := httputil.DumpResponse(resp, getBody)
if err != nil { if err != nil {
@ -69,3 +86,17 @@ func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string
return string(respDump) return string(respDump)
} }
func getRequestDump(ctx context.Context, req *http.Request, getBody bool) string {
reqDump, err := httputil.DumpRequest(req, getBody)
if err != nil {
logger.CtxErr(ctx, err).Error("dumping http request")
}
// Dump all headers
for k, v := range req.Header {
logger.Ctx(ctx).Debugf("Header field %q, Value %q\n", k, v)
}
return string(reqDump)
}

View File

@ -118,22 +118,23 @@ func (mw *LoggingMiddleware) Intercept(
middlewareIndex int, middlewareIndex int,
req *http.Request, req *http.Request,
) (*http.Response, error) { ) (*http.Response, error) {
// call the next middleware // call the next middleware
resp, err := pipeline.Next(req, middlewareIndex) resp, err := pipeline.Next(req, middlewareIndex)
if resp == nil { if resp == nil {
return resp, err return resp, err
} }
ctx := clues.Add( // ctx := clues.Add(
req.Context(), // req.Context(),
"method", req.Method, // "method", req.Method,
"url", LoggableURL(req.URL.String()), // "url", LoggableURL(req.URL.String()),
"request_content_len", req.ContentLength, // "request_content_len", req.ContentLength,
"resp_status", resp.Status, // "resp_status", resp.Status,
"resp_status_code", resp.StatusCode, // "resp_status_code", resp.StatusCode,
"resp_content_len", resp.ContentLength) // "resp_content_len", resp.ContentLength)
logResp(ctx, resp) //logResp(ctx, resp)
return resp, err return resp, err
} }
@ -334,6 +335,8 @@ func (mw *MetricsMiddleware) Intercept(
middlewareIndex int, middlewareIndex int,
req *http.Request, req *http.Request,
) (*http.Response, error) { ) (*http.Response, error) {
logReq(req.Context(), req)
var ( var (
start = time.Now() start = time.Now()
resp, err = pipeline.Next(req, middlewareIndex) resp, err = pipeline.Next(req, middlewareIndex)
@ -368,5 +371,16 @@ func (mw *MetricsMiddleware) Intercept(
events.IncN(xmrui, events.APICall, xmruHeader) events.IncN(xmrui, events.APICall, xmruHeader)
ctx := clues.Add(
req.Context(),
"method", req.Method,
"url", LoggableURL(req.URL.String()),
"request_content_len", req.ContentLength,
"resp_status", resp.Status,
"resp_status_code", resp.StatusCode,
"resp_content_len", resp.ContentLength)
logResp(ctx, resp)
return resp, err return resp, err
} }

View File

@ -168,7 +168,7 @@ func KiotaHTTPClient(
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const ( const (
defaultDelay = 3 * time.Second defaultDelay = 1 * time.Second
defaultHTTPClientTimeout = 1 * time.Hour defaultHTTPClientTimeout = 1 * time.Hour
defaultMaxRetries = 3 defaultMaxRetries = 3
// FIXME: This should ideally be 0, but if we set to 0, graph // FIXME: This should ideally be 0, but if we set to 0, graph
@ -285,6 +285,28 @@ func kiotaMiddlewares(
cc *clientConfig, cc *clientConfig,
counter *count.Bus, counter *count.Bus,
) []khttp.Middleware { ) []khttp.Middleware {
// Add chaos handler to default middleware
// chaosOpt := &khttp.ChaosHandlerOptions{
// ChaosStrategy: khttp.Random,
// ChaosPercentage: 50,
// StatusCode: 503,
// ResponseBody: &http.Response{
// Status: "Service Unavailable",
// StatusCode: 503,
// // Retry-After header
// // Header: http.Header{
// // "Retry-After": []string{"1"},
// // },
// // Dummy body
// Body: io.NopCloser(strings.NewReader("service is down. deal with it")),
// },
// }
// chaosHandler, err := khttp.NewChaosHandlerWithOptions(chaosOpt)
// if err != nil {
// panic(err)
// }
retryOptions := khttp.RetryHandlerOptions{ retryOptions := khttp.RetryHandlerOptions{
ShouldRetry: func( ShouldRetry: func(
delay time.Duration, delay time.Duration,
@ -311,6 +333,7 @@ func kiotaMiddlewares(
khttp.NewParametersNameDecodingHandler(), khttp.NewParametersNameDecodingHandler(),
khttp.NewUserAgentHandler(), khttp.NewUserAgentHandler(),
&LoggingMiddleware{}, &LoggingMiddleware{},
// chaosHandler,
} }
// Optionally add concurrency limiter middleware if it has been initialized. // Optionally add concurrency limiter middleware if it has been initialized.