Compare commits

...

9 Commits

Author SHA1 Message Date
Abhishek Pandey
79d7ffa18c move logging to metrics middleware 2023-12-07 18:46:06 -08:00
Abhishek Pandey
8dbda8ebda Put back req logging 2023-12-07 11:00:47 -08:00
Abhishek Pandey
21c006ebd8 only do get items 2023-12-06 22:24:18 -08:00
Abhishek Pandey
b8930d436d Remove chaos handler 2023-12-06 21:44:31 -08:00
Abhishek Pandey
eaf252a26c Dont log req 2023-12-06 21:35:33 -08:00
Abhishek Pandey
93079b0812 Always GET items 2023-12-06 19:33:01 -08:00
Abhishek Pandey
93406f81a1 Add dummy resp body 2023-12-06 18:49:07 -08:00
Abhishek Pandey
d3b902b410 Add chaos middleware 2023-12-06 18:21:15 -08:00
Abhishek Pandey
1c19b22d8c Add req logging 2023-12-06 18:21:01 -08:00
4 changed files with 109 additions and 34 deletions

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/spatialcurrent/go-lazy/pkg/lazy"
"github.com/alcionai/corso/src/internal/common/idname"
@ -362,36 +363,42 @@ func downloadContent(
itemID := ptr.Val(item.GetId())
ctx = clues.Add(ctx, "item_id", itemID)
content, err := downloadItem(ctx, iaag, item)
if err == nil {
return content, nil
} else if !graph.IsErrUnauthorizedOrBadToken(err) {
return nil, err
}
// content, err := downloadItem(ctx, iaag, item)
// if err == nil {
// return content, nil
// } else if !graph.IsErrUnauthorizedOrBadToken(err) {
// return nil, err
// }
// Assume unauthorized requests are a sign of an expired jwt
// token, and that we've overrun the available window to
// download the file. Get a fresh url from the cache and attempt to
// download again.
content, err = readItemContents(ctx, iaag, uc, itemID)
if err == nil {
logger.Ctx(ctx).Debug("found item in url cache")
return content, nil
}
// // Assume unauthorized requests are a sign of an expired jwt
// // token, and that we've overrun the available window to
// // download the file. Get a fresh url from the cache and attempt to
// // download again.
// content, err = readItemContents(ctx, iaag, uc, itemID)
// if err == nil {
// logger.Ctx(ctx).Debug("found item in url cache")
// return content, nil
// }
// Consider cache errors(including deleted items) as cache misses.
// Fallback to refetching the item using the graph API.
logger.CtxErr(ctx, err).Debug("url cache miss: refetching from API")
counter.Inc(count.URLCacheMiss)
// // Consider cache errors(including deleted items) as cache misses.
// // Fallback to refetching the item using the graph API.
// logger.CtxErr(ctx, err).Debug("url cache miss: refetching from API")
// counter.Inc(count.URLCacheMiss)
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
if err != nil {
return nil, clues.Wrap(err, "retrieving expired item")
var di models.DriveItemable
for i := 0; i < 1000000; i++ {
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
if err != nil {
return nil, clues.Wrap(err, "retrieving expired item")
}
logger.Ctx(ctx).Debugf("iteration %d item id %s\n ", i, ptr.Val(di.GetId()))
}
cdi := custom.ToCustomDriveItem(di)
content, err = downloadItem(ctx, iaag, cdi)
content, err := downloadItem(ctx, iaag, cdi)
if err != nil {
return nil, clues.Wrap(err, "content download retry")
}
@ -469,7 +476,7 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
int64(len(oc.driveItems)))
defer close(folderProgress)
semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).Item())
semaphoreCh := make(chan struct{}, 1)
defer close(semaphoreCh)
ctx = clues.Add(ctx,

View File

@ -42,6 +42,14 @@ func logResp(ctx context.Context, resp *http.Response) {
return
}
// Dump both request and response for graph 400 errors. This is a temporary
// measure to help us understand why graph is returning transient 400s.
if resp.StatusCode == http.StatusBadRequest {
log.With("response", getRespDump(ctx, resp, logBody)).
With("request", getRequestDump(ctx, resp.Request, true)).
Info("graph api bad request")
}
// Log api calls according to api debugging configurations.
switch respClass {
case 2:
@ -61,6 +69,15 @@ func logResp(ctx context.Context, resp *http.Response) {
}
}
func logReq(ctx context.Context, req *http.Request) {
var (
log = logger.Ctx(ctx)
)
log.With("request", getRequestDump(ctx, req, true)).
Info("graph api req")
}
func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string {
respDump, err := httputil.DumpResponse(resp, getBody)
if err != nil {
@ -69,3 +86,17 @@ func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string
return string(respDump)
}
func getRequestDump(ctx context.Context, req *http.Request, getBody bool) string {
reqDump, err := httputil.DumpRequest(req, getBody)
if err != nil {
logger.CtxErr(ctx, err).Error("dumping http request")
}
// Dump all headers
for k, v := range req.Header {
logger.Ctx(ctx).Debugf("Header field %q, Value %q\n", k, v)
}
return string(reqDump)
}

View File

@ -118,22 +118,23 @@ func (mw *LoggingMiddleware) Intercept(
middlewareIndex int,
req *http.Request,
) (*http.Response, error) {
// call the next middleware
resp, err := pipeline.Next(req, middlewareIndex)
if resp == nil {
return resp, err
}
ctx := clues.Add(
req.Context(),
"method", req.Method,
"url", LoggableURL(req.URL.String()),
"request_content_len", req.ContentLength,
"resp_status", resp.Status,
"resp_status_code", resp.StatusCode,
"resp_content_len", resp.ContentLength)
// ctx := clues.Add(
// req.Context(),
// "method", req.Method,
// "url", LoggableURL(req.URL.String()),
// "request_content_len", req.ContentLength,
// "resp_status", resp.Status,
// "resp_status_code", resp.StatusCode,
// "resp_content_len", resp.ContentLength)
logResp(ctx, resp)
//logResp(ctx, resp)
return resp, err
}
@ -334,6 +335,8 @@ func (mw *MetricsMiddleware) Intercept(
middlewareIndex int,
req *http.Request,
) (*http.Response, error) {
logReq(req.Context(), req)
var (
start = time.Now()
resp, err = pipeline.Next(req, middlewareIndex)
@ -368,5 +371,16 @@ func (mw *MetricsMiddleware) Intercept(
events.IncN(xmrui, events.APICall, xmruHeader)
ctx := clues.Add(
req.Context(),
"method", req.Method,
"url", LoggableURL(req.URL.String()),
"request_content_len", req.ContentLength,
"resp_status", resp.Status,
"resp_status_code", resp.StatusCode,
"resp_content_len", resp.ContentLength)
logResp(ctx, resp)
return resp, err
}

View File

@ -168,7 +168,7 @@ func KiotaHTTPClient(
// ---------------------------------------------------------------------------
const (
defaultDelay = 3 * time.Second
defaultDelay = 1 * time.Second
defaultHTTPClientTimeout = 1 * time.Hour
defaultMaxRetries = 3
// FIXME: This should ideally be 0, but if we set to 0, graph
@ -285,6 +285,28 @@ func kiotaMiddlewares(
cc *clientConfig,
counter *count.Bus,
) []khttp.Middleware {
// Add chaos handler to default middleware
// chaosOpt := &khttp.ChaosHandlerOptions{
// ChaosStrategy: khttp.Random,
// ChaosPercentage: 50,
// StatusCode: 503,
// ResponseBody: &http.Response{
// Status: "Service Unavailable",
// StatusCode: 503,
// // Retry-After header
// // Header: http.Header{
// // "Retry-After": []string{"1"},
// // },
// // Dummy body
// Body: io.NopCloser(strings.NewReader("service is down. deal with it")),
// },
// }
// chaosHandler, err := khttp.NewChaosHandlerWithOptions(chaosOpt)
// if err != nil {
// panic(err)
// }
retryOptions := khttp.RetryHandlerOptions{
ShouldRetry: func(
delay time.Duration,
@ -311,6 +333,7 @@ func kiotaMiddlewares(
khttp.NewParametersNameDecodingHandler(),
khttp.NewUserAgentHandler(),
&LoggingMiddleware{},
// chaosHandler,
}
// Optionally add concurrency limiter middleware if it has been initialized.