Skip to content

Commit

Permalink
Revert "Merge pull request #102581 from liggitt/revert-watch-retry"
Browse files Browse the repository at this point in the history
This reverts commit 5a59a43957c6743995dac67fdda42bf8e0a9ca77, reversing
changes made to 81b9789eaa7bc067f417b5e74d5695dd6dd88a46.

Kubernetes-commit: 892d4fabb845e2461e3655aa414beb6ac322fc99
  • Loading branch information
tkashem authored and k8s-publishing-bot committed Jun 4, 2021
1 parent ded678f commit bbd71da
Show file tree
Hide file tree
Showing 4 changed files with 646 additions and 243 deletions.
196 changes: 133 additions & 63 deletions rest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,43 +675,88 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
return nil, r.err
}

url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header = r.headers
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
if err != nil {
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err != nil {

isErrRetryableFunc := func(request *http.Request, err error) bool {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) || net.IsTimeout(err) {
return watch.NewEmptyWatch(), nil
return true
}
return nil, err
return false
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
if result := r.transformResponse(resp, req); result.err != nil {
return nil, result.err
var retryAfter *RetryAfter
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}

r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return nil, err
}
retryAfter = nil
}

resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
if err != nil {
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
}

done, transformErr := func() (bool, error) {
defer readAndCloseResponseBody(resp)

var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
klog.V(4).Infof("Could not retry request - %v", err)
}

if resp == nil {
// the server must have sent us an error in 'err'
return true, nil
}
if result := r.transformResponse(resp, req); result.err != nil {
return true, result.err
}
return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}()
if done {
if isErrRetryableFunc(req, err) {
return watch.NewEmptyWatch(), nil
}
if err == nil {
// if the server sent us an HTTP Response object,
// we need to return the error object from that.
err = transformErr
}
return nil, err
}
return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}
}

func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
Expand Down Expand Up @@ -766,49 +811,75 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
return nil, err
}

url := r.URL().String()
req, err := http.NewRequest(r.verb, url, nil)
if err != nil {
return nil, err
}
if r.body != nil {
req.Body = ioutil.NopCloser(r.body)
}
req = req.WithContext(ctx)
req.Header = r.headers
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {

var retryAfter *RetryAfter
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
return nil, err
}
if r.body != nil {
req.Body = ioutil.NopCloser(r.body)
}
}
if err != nil {
return nil, err
}

switch {
case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
handleWarnings(resp.Header, r.warningHandler)
return resp.Body, nil
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return nil, err
}
retryAfter = nil
}

default:
// ensure we close the body before returning the error
defer resp.Body.Close()
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
if err != nil {
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
}
if err != nil {
// we only retry on an HTTP response with 'Retry-After' header
return nil, err
}

switch {
case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
handleWarnings(resp.Header, r.warningHandler)
return resp.Body, nil

result := r.transformResponse(resp, req)
err := result.Error()
if err == nil {
err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
default:
done, transformErr := func() (bool, error) {
defer resp.Body.Close()

var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, neverRetryError)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
klog.V(4).Infof("Could not retry request - %v", err)
}
result := r.transformResponse(resp, req)
if err := result.Error(); err != nil {
return true, err
}
return true, fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
}()
if done {
return nil, transformErr
}
}
return nil, err
}
}

Expand Down Expand Up @@ -940,12 +1011,11 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
return false
})
if retry {
if err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body); err != nil {
klog.V(4).Infof("Could not retry request - %v", err)
f(req, resp)
return true
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, req.URL.String(), r.body)
if err == nil {
return false
}
return false
klog.V(4).Infof("Could not retry request - %v", err)
}

f(req, resp)
Expand Down
Loading

0 comments on commit bbd71da

Please sign in to comment.