Skip to content

Commit

Permalink
client-go: refactor retry logic so it can be reused
Browse files Browse the repository at this point in the history
Kubernetes-commit: 5fdf196b4e9bbba036a43f4c3b5d9ed8af1672cc
  • Loading branch information
tkashem authored and k8s-publishing-bot committed May 20, 2021
1 parent 228dada commit 239ec44
Show file tree
Hide file tree
Showing 4 changed files with 797 additions and 98 deletions.
147 changes: 51 additions & 96 deletions rest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ type Request struct {
rateLimiter flowcontrol.RateLimiter
backoff BackoffManager
timeout time.Duration
maxRetries int

// generic components accessible via method setters
verb string
Expand All @@ -110,8 +109,9 @@ type Request struct {
subresource string

// output
err error
body io.Reader
err error
body io.Reader
retry WithRetry
}

// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
Expand Down Expand Up @@ -142,7 +142,7 @@ func NewRequest(c *RESTClient) *Request {
backoff: backoff,
timeout: timeout,
pathPrefix: pathPrefix,
maxRetries: 10,
retry: &withRetry{maxRetries: 10},
warningHandler: c.warningHandler,
}

Expand Down Expand Up @@ -408,10 +408,7 @@ func (r *Request) Timeout(d time.Duration) *Request {
// function is specifically called with a different value.
// A zero maxRetries prevent it from doing retires and return an error immediately.
func (r *Request) MaxRetries(maxRetries int) *Request {
if maxRetries < 0 {
maxRetries = 0
}
r.maxRetries = maxRetries
r.retry.SetMaxRetries(maxRetries)
return r
}

Expand Down Expand Up @@ -842,6 +839,17 @@ func (r *Request) requestPreflightCheck() error {
return nil
}

func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header = r.headers
return req, nil
}

// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
Expand Down Expand Up @@ -881,27 +889,22 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
}

// Right now we make about ten retry attempts if we get a Retry-After response.
retries := 0
var retryInfo string
var retryAfter *RetryAfter
for {

url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
req, err := r.newHTTPRequest(ctx)
if err != nil {
return err
}
req = req.WithContext(ctx)
req.Header = r.headers

r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retries > 0 {
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryInfo); err != nil {
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return err
}
retryInfo = ""
retryAfter = nil
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
Expand All @@ -910,61 +913,46 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
} else {
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as
// they are not idempotent.
if r.verb != "GET" {
return err
}
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),

done := func() bool {
defer readAndCloseResponseBody(resp)

// if the the server returns an error in err, the response will be nil.
f := func(req *http.Request, resp *http.Response) {
if resp == nil {
return
}
} else {
return err
fn(req, resp)
}
}

done := func() bool {
// Ensure the response body is fully read and closed
// before we reconnect, so that we reuse the same TCP
// connection.
defer func() {
const maxBodySlurpSize = 2 << 10
if resp.ContentLength <= maxBodySlurpSize {
io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, func(req *http.Request, err error) bool {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as they are not idempotent.
if r.verb != "GET" {
return false
}
resp.Body.Close()
}()

retries++
if seconds, wait := checkWait(resp); wait && retries <= r.maxRetries {
retryInfo = getRetryReason(retries, seconds, resp, err)
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
fn(req, resp)
return true
}
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
return true
}
return false
})
if retry {
if err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body); err != nil {
klog.V(4).Infof("Could not retry request - %v", err)
f(req, resp)
return true
}

klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
r.backoff.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)

f(req, resp)
return true
}()
if done {
return nil
return err
}
}
}
Expand Down Expand Up @@ -1196,19 +1184,6 @@ func isTextResponse(resp *http.Response) bool {
return strings.HasPrefix(media, "text/")
}

// checkWait returns true along with a number of seconds if the server instructed us to wait
// before retrying.
func checkWait(resp *http.Response) (int, bool) {
switch r := resp.StatusCode; {
// any 500 error code and 429 can trigger a wait
case r == http.StatusTooManyRequests, r >= 500:
default:
return 0, false
}
i, ok := retryAfterSeconds(resp)
return i, ok
}

// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
// the header was missing or not a valid number.
func retryAfterSeconds(resp *http.Response) (int, bool) {
Expand All @@ -1220,26 +1195,6 @@ func retryAfterSeconds(resp *http.Response) (int, bool) {
return 0, false
}

func getRetryReason(retries, seconds int, resp *http.Response, err error) string {
// priority and fairness sets the UID of the FlowSchema associated with a request
// in the following response Header.
const responseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID"

message := fmt.Sprintf("retries: %d, retry-after: %ds", retries, seconds)

switch {
case resp.StatusCode == http.StatusTooManyRequests:
// it is server-side throttling from priority and fairness
flowSchemaUID := resp.Header.Get(responseHeaderMatchedFlowSchemaUID)
return fmt.Sprintf("%s - retry-reason: due to server-side throttling, FlowSchema UID: %q", message, flowSchemaUID)
case err != nil:
// it's a retriable error
return fmt.Sprintf("%s - retry-reason: due to retriable error, error: %v", message, err)
default:
return fmt.Sprintf("%s - retry-reason: %d", message, resp.StatusCode)
}
}

// Result contains the result of calling Request.Do().
type Result struct {
body []byte
Expand Down
Loading

0 comments on commit 239ec44

Please sign in to comment.