Compare commits
9 Commits
main
...
400_with_c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79d7ffa18c | ||
|
|
8dbda8ebda | ||
|
|
21c006ebd8 | ||
|
|
b8930d436d | ||
|
|
eaf252a26c | ||
|
|
93079b0812 | ||
|
|
93406f81a1 | ||
|
|
d3b902b410 | ||
|
|
1c19b22d8c |
@ -10,6 +10,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common/idname"
|
"github.com/alcionai/corso/src/internal/common/idname"
|
||||||
@ -362,36 +363,42 @@ func downloadContent(
|
|||||||
itemID := ptr.Val(item.GetId())
|
itemID := ptr.Val(item.GetId())
|
||||||
ctx = clues.Add(ctx, "item_id", itemID)
|
ctx = clues.Add(ctx, "item_id", itemID)
|
||||||
|
|
||||||
content, err := downloadItem(ctx, iaag, item)
|
// content, err := downloadItem(ctx, iaag, item)
|
||||||
if err == nil {
|
// if err == nil {
|
||||||
return content, nil
|
// return content, nil
|
||||||
} else if !graph.IsErrUnauthorizedOrBadToken(err) {
|
// } else if !graph.IsErrUnauthorizedOrBadToken(err) {
|
||||||
return nil, err
|
// return nil, err
|
||||||
}
|
// }
|
||||||
|
|
||||||
// Assume unauthorized requests are a sign of an expired jwt
|
// // Assume unauthorized requests are a sign of an expired jwt
|
||||||
// token, and that we've overrun the available window to
|
// // token, and that we've overrun the available window to
|
||||||
// download the file. Get a fresh url from the cache and attempt to
|
// // download the file. Get a fresh url from the cache and attempt to
|
||||||
// download again.
|
// // download again.
|
||||||
content, err = readItemContents(ctx, iaag, uc, itemID)
|
// content, err = readItemContents(ctx, iaag, uc, itemID)
|
||||||
if err == nil {
|
// if err == nil {
|
||||||
logger.Ctx(ctx).Debug("found item in url cache")
|
// logger.Ctx(ctx).Debug("found item in url cache")
|
||||||
return content, nil
|
// return content, nil
|
||||||
}
|
// }
|
||||||
|
|
||||||
// Consider cache errors(including deleted items) as cache misses.
|
// // Consider cache errors(including deleted items) as cache misses.
|
||||||
// Fallback to refetching the item using the graph API.
|
// // Fallback to refetching the item using the graph API.
|
||||||
logger.CtxErr(ctx, err).Debug("url cache miss: refetching from API")
|
// logger.CtxErr(ctx, err).Debug("url cache miss: refetching from API")
|
||||||
counter.Inc(count.URLCacheMiss)
|
// counter.Inc(count.URLCacheMiss)
|
||||||
|
|
||||||
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
var di models.DriveItemable
|
||||||
if err != nil {
|
|
||||||
return nil, clues.Wrap(err, "retrieving expired item")
|
for i := 0; i < 1000000; i++ {
|
||||||
|
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
||||||
|
if err != nil {
|
||||||
|
return nil, clues.Wrap(err, "retrieving expired item")
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Ctx(ctx).Debugf("iteration %d item id %s\n ", i, ptr.Val(di.GetId()))
|
||||||
}
|
}
|
||||||
|
|
||||||
cdi := custom.ToCustomDriveItem(di)
|
cdi := custom.ToCustomDriveItem(di)
|
||||||
|
|
||||||
content, err = downloadItem(ctx, iaag, cdi)
|
content, err := downloadItem(ctx, iaag, cdi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, clues.Wrap(err, "content download retry")
|
return nil, clues.Wrap(err, "content download retry")
|
||||||
}
|
}
|
||||||
@ -469,7 +476,7 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
|
|||||||
int64(len(oc.driveItems)))
|
int64(len(oc.driveItems)))
|
||||||
defer close(folderProgress)
|
defer close(folderProgress)
|
||||||
|
|
||||||
semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).Item())
|
semaphoreCh := make(chan struct{}, 1)
|
||||||
defer close(semaphoreCh)
|
defer close(semaphoreCh)
|
||||||
|
|
||||||
ctx = clues.Add(ctx,
|
ctx = clues.Add(ctx,
|
||||||
|
|||||||
@ -42,6 +42,14 @@ func logResp(ctx context.Context, resp *http.Response) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dump both request and response for graph 400 errors. This is a temporary
|
||||||
|
// measure to help us understand why graph is returning transient 400s.
|
||||||
|
if resp.StatusCode == http.StatusBadRequest {
|
||||||
|
log.With("response", getRespDump(ctx, resp, logBody)).
|
||||||
|
With("request", getRequestDump(ctx, resp.Request, true)).
|
||||||
|
Info("graph api bad request")
|
||||||
|
}
|
||||||
|
|
||||||
// Log api calls according to api debugging configurations.
|
// Log api calls according to api debugging configurations.
|
||||||
switch respClass {
|
switch respClass {
|
||||||
case 2:
|
case 2:
|
||||||
@ -61,6 +69,15 @@ func logResp(ctx context.Context, resp *http.Response) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func logReq(ctx context.Context, req *http.Request) {
|
||||||
|
var (
|
||||||
|
log = logger.Ctx(ctx)
|
||||||
|
)
|
||||||
|
|
||||||
|
log.With("request", getRequestDump(ctx, req, true)).
|
||||||
|
Info("graph api req")
|
||||||
|
}
|
||||||
|
|
||||||
func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string {
|
func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string {
|
||||||
respDump, err := httputil.DumpResponse(resp, getBody)
|
respDump, err := httputil.DumpResponse(resp, getBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -69,3 +86,17 @@ func getRespDump(ctx context.Context, resp *http.Response, getBody bool) string
|
|||||||
|
|
||||||
return string(respDump)
|
return string(respDump)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getRequestDump(ctx context.Context, req *http.Request, getBody bool) string {
|
||||||
|
reqDump, err := httputil.DumpRequest(req, getBody)
|
||||||
|
if err != nil {
|
||||||
|
logger.CtxErr(ctx, err).Error("dumping http request")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dump all headers
|
||||||
|
for k, v := range req.Header {
|
||||||
|
logger.Ctx(ctx).Debugf("Header field %q, Value %q\n", k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(reqDump)
|
||||||
|
}
|
||||||
|
|||||||
@ -118,22 +118,23 @@ func (mw *LoggingMiddleware) Intercept(
|
|||||||
middlewareIndex int,
|
middlewareIndex int,
|
||||||
req *http.Request,
|
req *http.Request,
|
||||||
) (*http.Response, error) {
|
) (*http.Response, error) {
|
||||||
|
|
||||||
// call the next middleware
|
// call the next middleware
|
||||||
resp, err := pipeline.Next(req, middlewareIndex)
|
resp, err := pipeline.Next(req, middlewareIndex)
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := clues.Add(
|
// ctx := clues.Add(
|
||||||
req.Context(),
|
// req.Context(),
|
||||||
"method", req.Method,
|
// "method", req.Method,
|
||||||
"url", LoggableURL(req.URL.String()),
|
// "url", LoggableURL(req.URL.String()),
|
||||||
"request_content_len", req.ContentLength,
|
// "request_content_len", req.ContentLength,
|
||||||
"resp_status", resp.Status,
|
// "resp_status", resp.Status,
|
||||||
"resp_status_code", resp.StatusCode,
|
// "resp_status_code", resp.StatusCode,
|
||||||
"resp_content_len", resp.ContentLength)
|
// "resp_content_len", resp.ContentLength)
|
||||||
|
|
||||||
logResp(ctx, resp)
|
//logResp(ctx, resp)
|
||||||
|
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
@ -334,6 +335,8 @@ func (mw *MetricsMiddleware) Intercept(
|
|||||||
middlewareIndex int,
|
middlewareIndex int,
|
||||||
req *http.Request,
|
req *http.Request,
|
||||||
) (*http.Response, error) {
|
) (*http.Response, error) {
|
||||||
|
logReq(req.Context(), req)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
resp, err = pipeline.Next(req, middlewareIndex)
|
resp, err = pipeline.Next(req, middlewareIndex)
|
||||||
@ -368,5 +371,16 @@ func (mw *MetricsMiddleware) Intercept(
|
|||||||
|
|
||||||
events.IncN(xmrui, events.APICall, xmruHeader)
|
events.IncN(xmrui, events.APICall, xmruHeader)
|
||||||
|
|
||||||
|
ctx := clues.Add(
|
||||||
|
req.Context(),
|
||||||
|
"method", req.Method,
|
||||||
|
"url", LoggableURL(req.URL.String()),
|
||||||
|
"request_content_len", req.ContentLength,
|
||||||
|
"resp_status", resp.Status,
|
||||||
|
"resp_status_code", resp.StatusCode,
|
||||||
|
"resp_content_len", resp.ContentLength)
|
||||||
|
|
||||||
|
logResp(ctx, resp)
|
||||||
|
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -168,7 +168,7 @@ func KiotaHTTPClient(
|
|||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultDelay = 3 * time.Second
|
defaultDelay = 1 * time.Second
|
||||||
defaultHTTPClientTimeout = 1 * time.Hour
|
defaultHTTPClientTimeout = 1 * time.Hour
|
||||||
defaultMaxRetries = 3
|
defaultMaxRetries = 3
|
||||||
// FIXME: This should ideally be 0, but if we set to 0, graph
|
// FIXME: This should ideally be 0, but if we set to 0, graph
|
||||||
@ -285,6 +285,28 @@ func kiotaMiddlewares(
|
|||||||
cc *clientConfig,
|
cc *clientConfig,
|
||||||
counter *count.Bus,
|
counter *count.Bus,
|
||||||
) []khttp.Middleware {
|
) []khttp.Middleware {
|
||||||
|
// Add chaos handler to default middleware
|
||||||
|
// chaosOpt := &khttp.ChaosHandlerOptions{
|
||||||
|
// ChaosStrategy: khttp.Random,
|
||||||
|
// ChaosPercentage: 50,
|
||||||
|
// StatusCode: 503,
|
||||||
|
// ResponseBody: &http.Response{
|
||||||
|
// Status: "Service Unavailable",
|
||||||
|
// StatusCode: 503,
|
||||||
|
// // Retry-After header
|
||||||
|
// // Header: http.Header{
|
||||||
|
// // "Retry-After": []string{"1"},
|
||||||
|
// // },
|
||||||
|
// // Dummy body
|
||||||
|
// Body: io.NopCloser(strings.NewReader("service is down. deal with it")),
|
||||||
|
// },
|
||||||
|
// }
|
||||||
|
|
||||||
|
// chaosHandler, err := khttp.NewChaosHandlerWithOptions(chaosOpt)
|
||||||
|
// if err != nil {
|
||||||
|
// panic(err)
|
||||||
|
// }
|
||||||
|
|
||||||
retryOptions := khttp.RetryHandlerOptions{
|
retryOptions := khttp.RetryHandlerOptions{
|
||||||
ShouldRetry: func(
|
ShouldRetry: func(
|
||||||
delay time.Duration,
|
delay time.Duration,
|
||||||
@ -311,6 +333,7 @@ func kiotaMiddlewares(
|
|||||||
khttp.NewParametersNameDecodingHandler(),
|
khttp.NewParametersNameDecodingHandler(),
|
||||||
khttp.NewUserAgentHandler(),
|
khttp.NewUserAgentHandler(),
|
||||||
&LoggingMiddleware{},
|
&LoggingMiddleware{},
|
||||||
|
// chaosHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Optionally add concurrency limiter middleware if it has been initialized.
|
// Optionally add concurrency limiter middleware if it has been initialized.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user