add retry middleware (#2406)
## Description Added retry middleware which handles retrial of all HTTP request if, - internalServerError - request timeout with default no of retrials being 3. ## Does this PR need a docs update or release note? - [ ] ⛔ No ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🧹 Tech Debt/Cleanup ## Issue(s) * https://github.com/alcionai/corso/issues/2287 ## Test Plan - [x] 💪 Manual
This commit is contained in:
parent
b646f502ef
commit
2e128726c0
@ -7,6 +7,7 @@ require (
|
||||
github.com/alcionai/clues v0.0.0-20230202001016-cbda58c9de9e
|
||||
github.com/aws/aws-sdk-go v1.44.200
|
||||
github.com/aws/aws-xray-sdk-go v1.8.0
|
||||
github.com/cenkalti/backoff/v4 v4.2.0
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/hashicorp/go-multierror v1.1.1
|
||||
github.com/kopia/kopia v0.12.2-0.20230123092305-e5387cec0acb
|
||||
|
||||
@ -73,6 +73,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
|
||||
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
|
||||
|
||||
@ -94,10 +94,7 @@ func (c Users) GetAll(ctx context.Context, errs *fault.Errors) ([]models.Userabl
|
||||
|
||||
var resp models.UserCollectionResponseable
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = service.Client().Users().Get(ctx, userOptions(&userFilterNoGuests))
|
||||
return err
|
||||
})
|
||||
resp, err = service.Client().Users().Get(ctx, userOptions(&userFilterNoGuests))
|
||||
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "getting all users").WithClues(ctx).WithAll(graph.ErrData(err)...)
|
||||
@ -141,10 +138,7 @@ func (c Users) GetByID(ctx context.Context, userID string) (models.Userable, err
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = c.stable.Client().UsersById(userID).Get(ctx, nil)
|
||||
return err
|
||||
})
|
||||
resp, err = c.stable.Client().UsersById(userID).Get(ctx, nil)
|
||||
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "getting user").WithClues(ctx).WithAll(graph.ErrData(err)...)
|
||||
@ -162,10 +156,7 @@ func (c Users) GetInfo(ctx context.Context, userID string) (*UserInfo, error) {
|
||||
)
|
||||
|
||||
// TODO: OneDrive
|
||||
err = graph.RunWithRetry(func() error {
|
||||
_, err = c.stable.Client().UsersById(userID).MailFolders().Get(ctx, nil)
|
||||
return err
|
||||
})
|
||||
_, err = c.stable.Client().UsersById(userID).MailFolders().Get(ctx, nil)
|
||||
|
||||
if err != nil {
|
||||
if !graph.IsErrExchangeMailFolderNotFound(err) {
|
||||
|
||||
@ -68,11 +68,7 @@ func (c Contacts) GetItem(
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
cont, err = c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil)
|
||||
return err
|
||||
})
|
||||
|
||||
cont, err = c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -91,10 +87,7 @@ func (c Contacts) GetContainerByID(
|
||||
|
||||
var resp models.ContactFolderable
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = c.stable.Client().UsersById(userID).ContactFoldersById(dirID).Get(ctx, ofcf)
|
||||
return err
|
||||
})
|
||||
resp, err = c.stable.Client().UsersById(userID).ContactFoldersById(dirID).Get(ctx, ofcf)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
@ -132,11 +125,7 @@ func (c Contacts) EnumerateContainers(
|
||||
ChildFolders()
|
||||
|
||||
for {
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = builder.Get(ctx, ofcf)
|
||||
return err
|
||||
})
|
||||
|
||||
resp, err = builder.Get(ctx, ofcf)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
@ -182,10 +171,7 @@ func (p *contactPager) getPage(ctx context.Context) (api.DeltaPageLinker, error)
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
return err
|
||||
})
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@ -78,10 +78,7 @@ func (c Events) GetContainerByID(
|
||||
|
||||
var cal models.Calendarable
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
cal, err = service.Client().UsersById(userID).CalendarsById(containerID).Get(ctx, ofc)
|
||||
return err
|
||||
})
|
||||
cal, err = service.Client().UsersById(userID).CalendarsById(containerID).Get(ctx, ofc)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -100,11 +97,7 @@ func (c Events) GetItem(
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
event, err = c.stable.Client().UsersById(user).EventsById(itemID).Get(ctx, nil)
|
||||
return err
|
||||
})
|
||||
|
||||
event, err = c.stable.Client().UsersById(user).EventsById(itemID).Get(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -175,11 +168,7 @@ func (c Events) EnumerateContainers(
|
||||
for {
|
||||
var err error
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = builder.Get(ctx, ofc)
|
||||
return err
|
||||
})
|
||||
|
||||
resp, err = builder.Get(ctx, ofc)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
@ -233,10 +222,7 @@ func (p *eventPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) {
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
return err
|
||||
})
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@ -99,10 +99,7 @@ func (c Mail) GetContainerByID(
|
||||
|
||||
var resp graph.Container
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = service.Client().UsersById(userID).MailFoldersById(dirID).Get(ctx, ofmf)
|
||||
return err
|
||||
})
|
||||
resp, err = service.Client().UsersById(userID).MailFoldersById(dirID).Get(ctx, ofmf)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
@ -118,11 +115,7 @@ func (c Mail) GetItem(
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
mail, err = c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil)
|
||||
return err
|
||||
})
|
||||
|
||||
mail, err = c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -188,11 +181,7 @@ func (c Mail) EnumerateContainers(
|
||||
for {
|
||||
var err error
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = builder.Get(ctx, nil)
|
||||
return err
|
||||
})
|
||||
|
||||
resp, err = builder.Get(ctx, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
@ -234,10 +223,7 @@ func (p *mailPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) {
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
page, err = p.builder.Get(ctx, p.options)
|
||||
return err
|
||||
})
|
||||
page, err = p.builder.Get(ctx, p.options)
|
||||
|
||||
return page, err
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package graph
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
|
||||
@ -96,7 +97,7 @@ func IsErrTimeout(err error) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
if errors.Is(err, context.DeadlineExceeded) || os.IsTimeout(err) {
|
||||
if errors.Is(err, context.DeadlineExceeded) || os.IsTimeout(err) || errors.Is(err, http.ErrHandlerTimeout) {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
101
src/internal/connector/graph/retry_middleware.go
Normal file
101
src/internal/connector/graph/retry_middleware.go
Normal file
@ -0,0 +1,101 @@
|
||||
package graph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
khttp "github.com/microsoft/kiota-http-go"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Client Middleware
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// RetryHandler handles transient HTTP responses and retries the request given the retry options
|
||||
type RetryHandler struct {
|
||||
// The maximum number of times a request can be retried
|
||||
MaxRetries int
|
||||
// The delay in seconds between retries
|
||||
Delay time.Duration
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) retryRequest(
|
||||
ctx context.Context,
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
resp *http.Response,
|
||||
executionCount int,
|
||||
cumulativeDelay time.Duration,
|
||||
exponentialBackoff *backoff.ExponentialBackOff,
|
||||
respErr error,
|
||||
) (*http.Response, error) {
|
||||
if (respErr != nil || middleware.isRetriableErrorCode(req, resp.StatusCode)) &&
|
||||
middleware.isRetriableRequest(req) &&
|
||||
executionCount < middleware.MaxRetries &&
|
||||
cumulativeDelay < time.Duration(absoluteMaxDelaySeconds)*time.Second {
|
||||
executionCount++
|
||||
|
||||
delay := middleware.getRetryDelay(req, resp, exponentialBackoff)
|
||||
|
||||
cumulativeDelay += delay
|
||||
|
||||
req.Header.Set(retryAttemptHeader, strconv.Itoa(executionCount))
|
||||
|
||||
time.Sleep(delay)
|
||||
|
||||
response, err := pipeline.Next(req, middlewareIndex)
|
||||
if err != nil && !IsErrTimeout(err) {
|
||||
return response, support.ConnectorStackErrorTraceWrap(err, "maximum retries or unretryable")
|
||||
}
|
||||
|
||||
return middleware.retryRequest(ctx,
|
||||
pipeline,
|
||||
middlewareIndex,
|
||||
req,
|
||||
response,
|
||||
executionCount,
|
||||
cumulativeDelay,
|
||||
exponentialBackoff,
|
||||
err)
|
||||
}
|
||||
|
||||
return resp, support.ConnectorStackErrorTraceWrap(respErr, "maximum retries or unretryable")
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) isRetriableErrorCode(req *http.Request, code int) bool {
|
||||
return code == http.StatusInternalServerError
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) isRetriableRequest(req *http.Request) bool {
|
||||
isBodiedMethod := req.Method == "POST" || req.Method == "PUT" || req.Method == "PATCH"
|
||||
if isBodiedMethod && req.Body != nil {
|
||||
return req.ContentLength != -1
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (middleware RetryHandler) getRetryDelay(
|
||||
req *http.Request,
|
||||
resp *http.Response,
|
||||
exponentialBackoff *backoff.ExponentialBackOff,
|
||||
) time.Duration {
|
||||
var retryAfter string
|
||||
if resp != nil {
|
||||
retryAfter = resp.Header.Get(retryAfterHeader)
|
||||
}
|
||||
|
||||
if retryAfter != "" {
|
||||
retryAfterDelay, err := strconv.ParseFloat(retryAfter, 64)
|
||||
if err == nil {
|
||||
return time.Duration(retryAfterDelay) * time.Second
|
||||
}
|
||||
} // TODO parse the header if it's a date
|
||||
|
||||
return exponentialBackoff.NextBackOff()
|
||||
}
|
||||
@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
"github.com/microsoft/kiota-abstractions-go/serialization"
|
||||
ka "github.com/microsoft/kiota-authentication-azure-go"
|
||||
khttp "github.com/microsoft/kiota-http-go"
|
||||
@ -21,8 +22,13 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS"
|
||||
numberOfRetries = 3
|
||||
logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS"
|
||||
numberOfRetries = 3
|
||||
retryAttemptHeader = "Retry-Attempt"
|
||||
retryAfterHeader = "Retry-After"
|
||||
defaultMaxRetries = 3
|
||||
defaultDelay = 3 * time.Second
|
||||
absoluteMaxDelaySeconds = 180
|
||||
)
|
||||
|
||||
// AllMetadataFileNames produces the standard set of filenames used to store graph
|
||||
@ -85,6 +91,11 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) {
|
||||
|
||||
type clientConfig struct {
|
||||
noTimeout bool
|
||||
// MaxRetries before failure
|
||||
maxRetries int
|
||||
// The minimum delay in seconds between retries
|
||||
minDelay time.Duration
|
||||
overrideRetryCount bool
|
||||
}
|
||||
|
||||
type option func(*clientConfig)
|
||||
@ -98,6 +109,21 @@ func (c *clientConfig) populate(opts ...option) *clientConfig {
|
||||
return c
|
||||
}
|
||||
|
||||
// apply updates the http.Client with the expected options.
|
||||
func (c *clientConfig) applyMiddlewareConfig() (retry int, delay time.Duration) {
|
||||
retry = defaultMaxRetries
|
||||
if c.overrideRetryCount {
|
||||
retry = c.maxRetries
|
||||
}
|
||||
|
||||
delay = defaultDelay
|
||||
if c.minDelay > 0 {
|
||||
delay = c.minDelay
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// apply updates the http.Client with the expected options.
|
||||
func (c *clientConfig) apply(hc *http.Client) {
|
||||
if c.noTimeout {
|
||||
@ -115,6 +141,19 @@ func NoTimeout() option {
|
||||
}
|
||||
}
|
||||
|
||||
func MaxRetries(max int) option {
|
||||
return func(c *clientConfig) {
|
||||
c.overrideRetryCount = true
|
||||
c.maxRetries = max
|
||||
}
|
||||
}
|
||||
|
||||
func MinimumBackoff(dur time.Duration) option {
|
||||
return func(c *clientConfig) {
|
||||
c.minDelay = dur
|
||||
}
|
||||
}
|
||||
|
||||
// CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library
|
||||
// with Azure identity package. An adapter object is a necessary to component
|
||||
// to create *msgraphsdk.GraphServiceClient
|
||||
@ -147,18 +186,51 @@ func CreateAdapter(tenant, client, secret string, opts ...option) (*msgraphsdk.G
|
||||
// can utilize it on a per-download basis.
|
||||
func HTTPClient(opts ...option) *http.Client {
|
||||
clientOptions := msgraphsdk.GetDefaultClientOptions()
|
||||
middlewares := msgraphgocore.GetDefaultMiddlewaresWithOptions(&clientOptions)
|
||||
middlewares = append(middlewares, &LoggingMiddleware{})
|
||||
clientconfig := (&clientConfig{}).populate(opts...)
|
||||
noOfRetries, minRetryDelay := clientconfig.applyMiddlewareConfig()
|
||||
middlewares := GetKiotaMiddlewares(&clientOptions, noOfRetries, minRetryDelay)
|
||||
httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...)
|
||||
httpClient.Timeout = time.Minute * 3
|
||||
|
||||
(&clientConfig{}).
|
||||
populate(opts...).
|
||||
apply(httpClient)
|
||||
clientconfig.apply(httpClient)
|
||||
|
||||
return httpClient
|
||||
}
|
||||
|
||||
// GetDefaultMiddlewares creates a new default set of middlewares for the Kiota request adapter
|
||||
func GetMiddlewares(maxRetry int, delay time.Duration) []khttp.Middleware {
|
||||
return []khttp.Middleware{
|
||||
&RetryHandler{
|
||||
// The maximum number of times a request can be retried
|
||||
MaxRetries: maxRetry,
|
||||
// The delay in seconds between retries
|
||||
Delay: delay,
|
||||
},
|
||||
khttp.NewRetryHandler(),
|
||||
khttp.NewRedirectHandler(),
|
||||
khttp.NewCompressionHandler(),
|
||||
khttp.NewParametersNameDecodingHandler(),
|
||||
khttp.NewUserAgentHandler(),
|
||||
&LoggingMiddleware{},
|
||||
}
|
||||
}
|
||||
|
||||
// GetKiotaMiddlewares creates a default slice of middleware for the Graph Client.
|
||||
func GetKiotaMiddlewares(options *msgraphgocore.GraphClientOptions,
|
||||
maxRetry int, minDelay time.Duration,
|
||||
) []khttp.Middleware {
|
||||
kiotaMiddlewares := GetMiddlewares(maxRetry, minDelay)
|
||||
graphMiddlewares := []khttp.Middleware{
|
||||
msgraphgocore.NewGraphTelemetryHandler(options),
|
||||
}
|
||||
graphMiddlewaresLen := len(graphMiddlewares)
|
||||
resultMiddlewares := make([]khttp.Middleware, len(kiotaMiddlewares)+graphMiddlewaresLen)
|
||||
copy(resultMiddlewares, graphMiddlewares)
|
||||
copy(resultMiddlewares[graphMiddlewaresLen:], kiotaMiddlewares)
|
||||
|
||||
return resultMiddlewares
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Interfaces
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -245,25 +317,31 @@ func (handler *LoggingMiddleware) Intercept(
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Run a function with retries
|
||||
func RunWithRetry(run func() error) error {
|
||||
var err error
|
||||
// Intercept implements the interface and evaluates whether to retry a failed request.
|
||||
func (middleware RetryHandler) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
ctx := req.Context()
|
||||
|
||||
for i := 0; i < numberOfRetries; i++ {
|
||||
err = run()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// only retry on timeouts and 500-internal-errors.
|
||||
if !(IsErrTimeout(err) || IsInternalServerError(err)) {
|
||||
break
|
||||
}
|
||||
|
||||
if i < numberOfRetries {
|
||||
time.Sleep(time.Duration(3*(i+2)) * time.Second)
|
||||
}
|
||||
response, err := pipeline.Next(req, middlewareIndex)
|
||||
if err != nil && !IsErrTimeout(err) {
|
||||
return response, support.ConnectorStackErrorTraceWrap(err, "maximum retries or unretryable")
|
||||
}
|
||||
|
||||
return support.ConnectorStackErrorTraceWrap(err, "maximum retries or unretryable")
|
||||
exponentialBackOff := backoff.NewExponentialBackOff()
|
||||
exponentialBackOff.InitialInterval = middleware.Delay
|
||||
exponentialBackOff.Reset()
|
||||
|
||||
return middleware.retryRequest(
|
||||
ctx,
|
||||
pipeline,
|
||||
middlewareIndex,
|
||||
req,
|
||||
response,
|
||||
0,
|
||||
0,
|
||||
exponentialBackOff,
|
||||
err)
|
||||
}
|
||||
|
||||
@ -68,10 +68,7 @@ func (p *driveItemPager) GetPage(ctx context.Context) (api.DeltaPageLinker, erro
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
return err
|
||||
})
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
@ -120,10 +117,7 @@ func (p *userDrivePager) GetPage(ctx context.Context) (api.PageLinker, error) {
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
return err
|
||||
})
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
@ -168,10 +162,7 @@ func (p *siteDrivePager) GetPage(ctx context.Context) (api.PageLinker, error) {
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
return err
|
||||
})
|
||||
resp, err = p.builder.Get(ctx, p.options)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@ -32,10 +32,6 @@ const (
|
||||
// TODO: Tune this later along with collectionChannelBufferSize
|
||||
urlPrefetchChannelBufferSize = 5
|
||||
|
||||
// Max number of retries to get doc from M365
|
||||
// Seems to timeout at times because of multiple requests
|
||||
maxRetries = 4 // 1 + 3 retries
|
||||
|
||||
MetaFileSuffix = ".meta"
|
||||
DirMetaFileSuffix = ".dirmeta"
|
||||
DataFileSuffix = ".data"
|
||||
@ -295,10 +291,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
itemMeta = io.NopCloser(strings.NewReader("{}"))
|
||||
itemMetaSize = 2
|
||||
} else {
|
||||
err = graph.RunWithRetry(func() error {
|
||||
itemMeta, itemMetaSize, err = oc.itemMetaReader(ctx, oc.service, oc.driveID, item)
|
||||
return err
|
||||
})
|
||||
itemMeta, itemMetaSize, err = oc.itemMetaReader(ctx, oc.service, oc.driveID, item)
|
||||
|
||||
if err != nil {
|
||||
errUpdater(*item.GetId(), errors.Wrap(err, "failed to get item permissions"))
|
||||
@ -333,38 +326,18 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
err error
|
||||
)
|
||||
|
||||
for i := 1; i <= maxRetries; i++ {
|
||||
_, itemData, err = oc.itemReader(oc.itemClient, item)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if graph.IsErrUnauthorized(err) {
|
||||
// assume unauthorized requests are a sign of an expired
|
||||
// jwt token, and that we've overrun the available window
|
||||
// to download the actual file. Re-downloading the item
|
||||
// will refresh that download url.
|
||||
di, diErr := getDriveItem(ctx, oc.service, oc.driveID, itemID)
|
||||
if diErr != nil {
|
||||
err = errors.Wrap(diErr, "retrieving expired item")
|
||||
break
|
||||
}
|
||||
|
||||
item = di
|
||||
|
||||
continue
|
||||
|
||||
} else if !graph.IsErrTimeout(err) &&
|
||||
!graph.IsInternalServerError(err) {
|
||||
// Don't retry for non-timeout, on-unauth, as
|
||||
// we are already retrying it in the default
|
||||
// retry middleware
|
||||
break
|
||||
}
|
||||
|
||||
if i < maxRetries {
|
||||
time.Sleep(1 * time.Second)
|
||||
_, itemData, err = oc.itemReader(oc.itemClient, item)
|
||||
|
||||
if err != nil && graph.IsErrUnauthorized(err) {
|
||||
// assume unauthorized requests are a sign of an expired
|
||||
// jwt token, and that we've overrun the available window
|
||||
// to download the actual file. Re-downloading the item
|
||||
// will refresh that download url.
|
||||
di, diErr := getDriveItem(ctx, oc.service, oc.driveID, itemID)
|
||||
if diErr != nil {
|
||||
err = errors.Wrap(diErr, "retrieving expired item")
|
||||
}
|
||||
item = di
|
||||
}
|
||||
|
||||
// check for errors following retries
|
||||
|
||||
@ -26,13 +26,15 @@ const (
|
||||
|
||||
// nextLinkKey is used to find the next link in a paged
|
||||
// graph response
|
||||
nextLinkKey = "@odata.nextLink"
|
||||
itemChildrenRawURLFmt = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/children"
|
||||
itemByPathRawURLFmt = "https://graph.microsoft.com/v1.0/drives/%s/items/%s:/%s"
|
||||
itemNotFoundErrorCode = "itemNotFound"
|
||||
userMysiteURLNotFound = "BadRequest Unable to retrieve user's mysite URL"
|
||||
userMysiteNotFound = "ResourceNotFound User's mysite not found"
|
||||
contextDeadlineExceeded = "context deadline exceeded"
|
||||
nextLinkKey = "@odata.nextLink"
|
||||
itemChildrenRawURLFmt = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/children"
|
||||
itemByPathRawURLFmt = "https://graph.microsoft.com/v1.0/drives/%s/items/%s:/%s"
|
||||
itemNotFoundErrorCode = "itemNotFound"
|
||||
userMysiteURLNotFound = "BadRequest Unable to retrieve user's mysite URL"
|
||||
userMysiteURLNotFoundMsg = "Unable to retrieve user's mysite URL"
|
||||
userMysiteNotFound = "ResourceNotFound User's mysite not found"
|
||||
userMysiteNotFoundMsg = "User's mysite not found"
|
||||
contextDeadlineExceeded = "context deadline exceeded"
|
||||
)
|
||||
|
||||
// DeltaUpdate holds the results of a current delta token. It normally
|
||||
@ -91,9 +93,11 @@ func drives(
|
||||
page, err = pager.GetPage(ctx)
|
||||
if err != nil {
|
||||
// Various error handling. May return an error or perform a retry.
|
||||
detailedError := err.Error()
|
||||
detailedError := support.ConnectorStackErrorTraceWrap(err, "").Error()
|
||||
if strings.Contains(detailedError, userMysiteURLNotFound) ||
|
||||
strings.Contains(detailedError, userMysiteNotFound) {
|
||||
strings.Contains(detailedError, userMysiteURLNotFoundMsg) ||
|
||||
strings.Contains(detailedError, userMysiteNotFound) ||
|
||||
strings.Contains(detailedError, userMysiteNotFoundMsg) {
|
||||
logger.Ctx(ctx).Infof("resource owner does not have a drive")
|
||||
return make([]models.Driveable, 0), nil // no license or drives.
|
||||
}
|
||||
@ -271,10 +275,7 @@ func getFolder(
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
foundItem, err = builder.Get(ctx, nil)
|
||||
return err
|
||||
})
|
||||
foundItem, err = builder.Get(ctx, nil)
|
||||
|
||||
if err != nil {
|
||||
var oDataError *odataerrors.ODataError
|
||||
|
||||
@ -10,7 +10,6 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
|
||||
discover "github.com/alcionai/corso/src/internal/connector/discovery/api"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph/betasdk/models"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph/betasdk/sites"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
@ -60,10 +59,7 @@ func GetSitePages(
|
||||
|
||||
var page models.SitePageable
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
page, err = serv.Client().SitesById(siteID).PagesById(pageID).Get(ctx, opts)
|
||||
return err
|
||||
})
|
||||
page, err = serv.Client().SitesById(siteID).PagesById(pageID).Get(ctx, opts)
|
||||
if err != nil {
|
||||
errUpdater(pageID, errors.Wrap(err, support.ConnectorStackErrorTrace(err)+" fetching page"))
|
||||
} else {
|
||||
@ -92,10 +88,7 @@ func FetchPages(ctx context.Context, bs *discover.BetaService, siteID string) ([
|
||||
)
|
||||
|
||||
for {
|
||||
err = graph.RunWithRetry(func() error {
|
||||
resp, err = builder.Get(ctx, opts)
|
||||
return err
|
||||
})
|
||||
resp, err = builder.Get(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, support.ConnectorStackErrorTraceWrap(err, "failed fetching site page")
|
||||
}
|
||||
|
||||
@ -127,10 +127,7 @@ func loadSiteLists(
|
||||
err error
|
||||
)
|
||||
|
||||
err = graph.RunWithRetry(func() error {
|
||||
entry, err = gs.Client().SitesById(siteID).ListsById(id).Get(ctx, nil)
|
||||
return err
|
||||
})
|
||||
entry, err = gs.Client().SitesById(siteID).ListsById(id).Get(ctx, nil)
|
||||
if err != nil {
|
||||
errUpdater(id, support.ConnectorStackErrorTraceWrap(err, ""))
|
||||
return
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user