Compare commits
9 Commits
main
...
400_with_c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79d7ffa18c | ||
|
|
8dbda8ebda | ||
|
|
21c006ebd8 | ||
|
|
b8930d436d | ||
|
|
eaf252a26c | ||
|
|
93079b0812 | ||
|
|
93406f81a1 | ||
|
|
d3b902b410 | ||
|
|
1c19b22d8c |
@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/idname"
|
||||
@ -362,36 +363,42 @@ func downloadContent(
|
||||
itemID := ptr.Val(item.GetId())
|
||||
ctx = clues.Add(ctx, "item_id", itemID)
|
||||
|
||||
content, err := downloadItem(ctx, iaag, item)
|
||||
if err == nil {
|
||||
return content, nil
|
||||
} else if !graph.IsErrUnauthorizedOrBadToken(err) {
|
||||
return nil, err
|
||||
}
|
||||
// content, err := downloadItem(ctx, iaag, item)
|
||||
// if err == nil {
|
||||
// return content, nil
|
||||
// } else if !graph.IsErrUnauthorizedOrBadToken(err) {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// Assume unauthorized requests are a sign of an expired jwt
|
||||
// token, and that we've overrun the available window to
|
||||
// download the file. Get a fresh url from the cache and attempt to
|
||||
// download again.
|
||||
content, err = readItemContents(ctx, iaag, uc, itemID)
|
||||
if err == nil {
|
||||
logger.Ctx(ctx).Debug("found item in url cache")
|
||||
return content, nil
|
||||
}
|
||||
// // Assume unauthorized requests are a sign of an expired jwt
|
||||
// // token, and that we've overrun the available window to
|
||||
// // download the file. Get a fresh url from the cache and attempt to
|
||||
// // download again.
|
||||
// content, err = readItemContents(ctx, iaag, uc, itemID)
|
||||
// if err == nil {
|
||||
// logger.Ctx(ctx).Debug("found item in url cache")
|
||||
// return content, nil
|
||||
// }
|
||||
|
||||
// Consider cache errors(including deleted items) as cache misses.
|
||||
// Fallback to refetching the item using the graph API.
|
||||
logger.CtxErr(ctx, err).Debug("url cache miss: refetching from API")
|
||||
counter.Inc(count.URLCacheMiss)
|
||||
// // Consider cache errors(including deleted items) as cache misses.
|
||||
// // Fallback to refetching the item using the graph API.
|
||||
// logger.CtxErr(ctx, err).Debug("url cache miss: refetching from API")
|
||||
// counter.Inc(count.URLCacheMiss)
|
||||
|
||||
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "retrieving expired item")
|
||||
var di models.DriveItemable
|
||||
|
||||
for i := 0; i < 1000000; i++ {
|
||||
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "retrieving expired item")
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Debugf("iteration %d item id %s\n ", i, ptr.Val(di.GetId()))
|
||||
}
|
||||
|
||||
cdi := custom.ToCustomDriveItem(di)
|
||||
|
||||
content, err = downloadItem(ctx, iaag, cdi)
|
||||
content, err := downloadItem(ctx, iaag, cdi)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "content download retry")
|
||||
}
|
||||
@ -469,7 +476,7 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
|
||||
int64(len(oc.driveItems)))
|
||||
defer close(folderProgress)
|
||||
|
||||
semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).Item())
|
||||
semaphoreCh := make(chan struct{}, 1)
|
||||
defer close(semaphoreCh)
|
||||
|
||||
ctx = clues.Add(ctx,
|
||||
|
||||
@ -42,6 +42,14 @@ func logResp(ctx context.Context, resp *http.Response) {
|
||||
return
|
||||
}
|
||||
|
||||
// Dump both request and response for graph 400 errors. This is a temporary
|
||||
// measure to help us understand why graph is returning transient 400s.
|
||||
if resp.StatusCode == http.StatusBadRequest {
|
||||
log.With("response", getRespDump(ctx, resp, logBody)).
|
||||
With("request", getRequestDump(ctx, resp.Request, true)).
|
||||
Info("graph api bad request")
|
||||
}
|
||||
|
||||
// Log api calls according to api debugging configurations.
|
||||
switch respClass {
|
||||
case 2:
|
||||
@ -61,6 +69,15 @@ func logResp(ctx context.Context, resp *http.Response) {
|
||||
}
|
||||
}
|
||||
|
||||
func logReq(ctx context.Context, req *http.Request) {
|
||||
var (
|
||||
log = logger.Ctx(ctx)
|
||||
)
|
||||
|
||||
log.With("request", getRequestDump(ctx, req, true)).
|
||||
Info("graph api req")
|
||||
}
|
||||
|
||||
func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string {
|
||||
respDump, err := httputil.DumpResponse(resp, getBody)
|
||||
if err != nil {
|
||||
@ -69,3 +86,17 @@ func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string
|
||||
|
||||
return string(respDump)
|
||||
}
|
||||
|
||||
func getRequestDump(ctx context.Context, req *http.Request, getBody bool) string {
|
||||
reqDump, err := httputil.DumpRequest(req, getBody)
|
||||
if err != nil {
|
||||
logger.CtxErr(ctx, err).Error("dumping http request")
|
||||
}
|
||||
|
||||
// Dump all headers
|
||||
for k, v := range req.Header {
|
||||
logger.Ctx(ctx).Debugf("Header field %q, Value %q\n", k, v)
|
||||
}
|
||||
|
||||
return string(reqDump)
|
||||
}
|
||||
|
||||
@ -118,22 +118,23 @@ func (mw *LoggingMiddleware) Intercept(
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
|
||||
// call the next middleware
|
||||
resp, err := pipeline.Next(req, middlewareIndex)
|
||||
if resp == nil {
|
||||
return resp, err
|
||||
}
|
||||
|
||||
ctx := clues.Add(
|
||||
req.Context(),
|
||||
"method", req.Method,
|
||||
"url", LoggableURL(req.URL.String()),
|
||||
"request_content_len", req.ContentLength,
|
||||
"resp_status", resp.Status,
|
||||
"resp_status_code", resp.StatusCode,
|
||||
"resp_content_len", resp.ContentLength)
|
||||
// ctx := clues.Add(
|
||||
// req.Context(),
|
||||
// "method", req.Method,
|
||||
// "url", LoggableURL(req.URL.String()),
|
||||
// "request_content_len", req.ContentLength,
|
||||
// "resp_status", resp.Status,
|
||||
// "resp_status_code", resp.StatusCode,
|
||||
// "resp_content_len", resp.ContentLength)
|
||||
|
||||
logResp(ctx, resp)
|
||||
//logResp(ctx, resp)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
@ -334,6 +335,8 @@ func (mw *MetricsMiddleware) Intercept(
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
logReq(req.Context(), req)
|
||||
|
||||
var (
|
||||
start = time.Now()
|
||||
resp, err = pipeline.Next(req, middlewareIndex)
|
||||
@ -368,5 +371,16 @@ func (mw *MetricsMiddleware) Intercept(
|
||||
|
||||
events.IncN(xmrui, events.APICall, xmruHeader)
|
||||
|
||||
ctx := clues.Add(
|
||||
req.Context(),
|
||||
"method", req.Method,
|
||||
"url", LoggableURL(req.URL.String()),
|
||||
"request_content_len", req.ContentLength,
|
||||
"resp_status", resp.Status,
|
||||
"resp_status_code", resp.StatusCode,
|
||||
"resp_content_len", resp.ContentLength)
|
||||
|
||||
logResp(ctx, resp)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@ -168,7 +168,7 @@ func KiotaHTTPClient(
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const (
|
||||
defaultDelay = 3 * time.Second
|
||||
defaultDelay = 1 * time.Second
|
||||
defaultHTTPClientTimeout = 1 * time.Hour
|
||||
defaultMaxRetries = 3
|
||||
// FIXME: This should ideally be 0, but if we set to 0, graph
|
||||
@ -285,6 +285,28 @@ func kiotaMiddlewares(
|
||||
cc *clientConfig,
|
||||
counter *count.Bus,
|
||||
) []khttp.Middleware {
|
||||
// Add chaos handler to default middleware
|
||||
// chaosOpt := &khttp.ChaosHandlerOptions{
|
||||
// ChaosStrategy: khttp.Random,
|
||||
// ChaosPercentage: 50,
|
||||
// StatusCode: 503,
|
||||
// ResponseBody: &http.Response{
|
||||
// Status: "Service Unavailable",
|
||||
// StatusCode: 503,
|
||||
// // Retry-After header
|
||||
// // Header: http.Header{
|
||||
// // "Retry-After": []string{"1"},
|
||||
// // },
|
||||
// // Dummy body
|
||||
// Body: io.NopCloser(strings.NewReader("service is down. deal with it")),
|
||||
// },
|
||||
// }
|
||||
|
||||
// chaosHandler, err := khttp.NewChaosHandlerWithOptions(chaosOpt)
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
|
||||
retryOptions := khttp.RetryHandlerOptions{
|
||||
ShouldRetry: func(
|
||||
delay time.Duration,
|
||||
@ -311,6 +333,7 @@ func kiotaMiddlewares(
|
||||
khttp.NewParametersNameDecodingHandler(),
|
||||
khttp.NewUserAgentHandler(),
|
||||
&LoggingMiddleware{},
|
||||
// chaosHandler,
|
||||
}
|
||||
|
||||
// Optionally add concurrency limiter middleware if it has been initialized.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user