Skip to content

Commit

Permalink
client-go: reset request body after response is read and closed
Browse files Browse the repository at this point in the history
This commit refactors the retry logic to include resetting the
request body. The reset logic will be called iff it is not the
first attempt. This refactor is nescessary mainly because now
as per the retry logic, we always ensure that the request body
is reset *after* the response body is *fully* read and closed
in order to reuse the same TCP connection.

Previously, the reset of the request body and the call to read
and close the response body were not in the right order, which
leads to race conditions.

This commit also adds a test that verifies the order in which
the function calls are made to ensure that we seek only after
the response body is closed.

Co-authored-by: Madhav Jivrajani <[email protected]>

Kubernetes-commit: 68c8c458ee8f6629eef806c48c1a776dedad3ec4
  • Loading branch information
tkashem authored and k8s-publishing-bot committed Mar 29, 2022
1 parent 8a672f0 commit 01ab7fb
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 55 deletions.
25 changes: 11 additions & 14 deletions rest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,15 +614,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
}
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}

if err := r.retry.Before(ctx, r); err != nil {
return nil, r.retry.WrapPreviousError(err)
}

req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
r.retry.After(ctx, r, resp, err)
Expand Down Expand Up @@ -722,18 +721,17 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {

url := r.URL().String()
for {
if err := r.retry.Before(ctx, r); err != nil {
return nil, err
}

req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
if r.body != nil {
req.Body = ioutil.NopCloser(r.body)
}

if err := r.retry.Before(ctx, r); err != nil {
return nil, err
}

resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
r.retry.After(ctx, r, resp, err)
Expand Down Expand Up @@ -859,14 +857,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp

// Right now we make about ten retry attempts if we get a Retry-After response.
for {
if err := r.retry.Before(ctx, r); err != nil {
return r.retry.WrapPreviousError(err)
}
req, err := r.newHTTPRequest(ctx)
if err != nil {
return err
}

if err := r.retry.Before(ctx, r); err != nil {
return r.retry.WrapPreviousError(err)
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
Expand Down
127 changes: 118 additions & 9 deletions rest/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func TestRequestWatch(t *testing.T) {
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsInternalError(err)
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
},
},
{
Expand All @@ -954,7 +954,10 @@ func TestRequestWatch(t *testing.T) {
serverReturns: []responseErr{
{response: nil, err: io.EOF},
},
Empty: true,
Err: true,
ErrFn: func(err error) bool {
return !apierrors.IsInternalError(err)
},
},
{
name: "max retries 2, server always returns a response with Retry-After header",
Expand Down Expand Up @@ -1130,7 +1133,7 @@ func TestRequestStream(t *testing.T) {
},
Err: true,
ErrFn: func(err error) bool {
return apierrors.IsInternalError(err)
return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF")
},
},
{
Expand Down Expand Up @@ -1371,8 +1374,6 @@ func (b *testBackoffManager) Sleep(d time.Duration) {
}

func TestCheckRetryClosesBody(t *testing.T) {
// unblock CI until http:https://issue.k8s.io/108906 is resolved in 1.24
t.Skip("http:https://issue.k8s.io/108906")
count := 0
ch := make(chan struct{})
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -2435,6 +2436,7 @@ func TestRequestWithRetry(t *testing.T) {
body io.Reader
serverReturns responseErr
errExpected error
errContains string
transformFuncInvokedExpected int
roundTripInvokedExpected int
}{
Expand All @@ -2451,7 +2453,7 @@ func TestRequestWithRetry(t *testing.T) {
body: &readSeeker{err: io.EOF},
serverReturns: responseErr{response: retryAfterResponse(), err: nil},
errExpected: nil,
transformFuncInvokedExpected: 1,
transformFuncInvokedExpected: 0,
roundTripInvokedExpected: 1,
},
{
Expand All @@ -2474,7 +2476,7 @@ func TestRequestWithRetry(t *testing.T) {
name: "server returns retryable err, request body Seek returns error, retry aborted",
body: &readSeeker{err: io.EOF},
serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
errExpected: io.ErrUnexpectedEOF,
errContains: "failed to reset the request body while retrying a request: EOF",
transformFuncInvokedExpected: 0,
roundTripInvokedExpected: 1,
},
Expand Down Expand Up @@ -2517,8 +2519,15 @@ func TestRequestWithRetry(t *testing.T) {
if test.transformFuncInvokedExpected != transformFuncInvoked {
t.Errorf("Expected transform func to be invoked %d times, but got: %d", test.transformFuncInvokedExpected, transformFuncInvoked)
}
if test.errExpected != unWrap(err) {
t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
switch {
case test.errExpected != nil:
if test.errExpected != unWrap(err) {
t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
}
case len(test.errContains) > 0:
if !strings.Contains(err.Error(), test.errContains) {
t.Errorf("Expected error message to caontain: %q, but got: %q", test.errContains, err.Error())
}
}
})
}
Expand Down Expand Up @@ -3531,3 +3540,103 @@ func TestTransportConcurrency(t *testing.T) {
})
}
}

// TODO: see if we can consolidate the other trackers into one.
type requestBodyTracker struct {
io.ReadSeeker
f func(string)
}

func (t *requestBodyTracker) Read(p []byte) (int, error) {
t.f("Request.Body.Read")
return t.ReadSeeker.Read(p)
}

func (t *requestBodyTracker) Seek(offset int64, whence int) (int64, error) {
t.f("Request.Body.Seek")
return t.ReadSeeker.Seek(offset, whence)
}

type responseBodyTracker struct {
io.ReadCloser
f func(string)
}

func (t *responseBodyTracker) Read(p []byte) (int, error) {
t.f("Response.Body.Read")
return t.ReadCloser.Read(p)
}

func (t *responseBodyTracker) Close() error {
t.f("Response.Body.Close")
return t.ReadCloser.Close()
}

type recorder struct {
order []string
}

func (r *recorder) record(call string) {
r.order = append(r.order, call)
}

func TestRequestBodyResetOrder(t *testing.T) {
recorder := &recorder{}
respBodyTracker := &responseBodyTracker{
ReadCloser: nil, // the server will fill it
f: recorder.record,
}

var attempts int
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
defer func() {
attempts++
}()

// read the request body.
ioutil.ReadAll(req.Body)

// first attempt, we send a retry-after
if attempts == 0 {
resp := retryAfterResponse()
respBodyTracker.ReadCloser = ioutil.NopCloser(bytes.NewReader([]byte{}))
resp.Body = respBodyTracker
return resp, nil
}

return &http.Response{StatusCode: http.StatusOK}, nil
})

reqBodyTracker := &requestBodyTracker{
ReadSeeker: bytes.NewReader([]byte{}), // empty body ensures one Read operation at most.
f: recorder.record,
}
req := &Request{
verb: "POST",
body: reqBodyTracker,
c: &RESTClient{
content: defaultContentConfig(),
Client: client,
},
backoff: &noSleepBackOff{},
retry: &withRetry{maxRetries: 1},
}

req.Do(context.Background())

expected := []string{
// 1st attempt: the server handler reads the request body
"Request.Body.Read",
// the server sends a retry-after, client reads the
// response body, and closes it
"Response.Body.Read",
"Response.Body.Close",
// client retry logic seeks to the beginning of the request body
"Request.Body.Seek",
// 2nd attempt: the server reads the request body
"Request.Body.Read",
}
if !reflect.DeepEqual(expected, recorder.order) {
t.Errorf("Expected invocation request and response body operations for retry do not match: %s", cmp.Diff(expected, recorder.order))
}
}
53 changes: 21 additions & 32 deletions rest/with_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ type WithRetry interface {
IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool

// Before should be invoked prior to each attempt, including
// the first one. if an error is returned, the request
// should be aborted immediately.
// the first one. If an error is returned, the request should
// be aborted immediately.
//
// Before may also be additionally responsible for preparing
// the request for the next retry, namely in terms of resetting
// the request body in case it has been read.
Before(ctx context.Context, r *Request) error

// After should be invoked immediately after an attempt is made.
Expand Down Expand Up @@ -194,46 +198,18 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *
r.retryAfter.Wait = time.Duration(seconds) * time.Second
r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)

if err := r.prepareForNextRetry(ctx, restReq); err != nil {
klog.V(4).Infof("Could not retry request - %v", err)
return false
}

return true
}

// prepareForNextRetry is responsible for carrying out operations that need
// to be completed before the next retry is initiated:
// - if the request context is already canceled there is no need to
// retry, the function will return ctx.Err().
// - we need to seek to the beginning of the request body before we
// initiate the next retry, the function should return an error if
// it fails to do so.
func (r *withRetry) prepareForNextRetry(ctx context.Context, request *Request) error {
if ctx.Err() != nil {
return ctx.Err()
}

// Ensure the response body is fully read and closed before
// we reconnect, so that we reuse the same TCP connection.
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
if _, err := seeker.Seek(0, 0); err != nil {
return fmt.Errorf("can't Seek() back to beginning of body for %T", request)
}
}

klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
return nil
}

func (r *withRetry) Before(ctx context.Context, request *Request) error {
// If the request context is already canceled there
// is no need to retry.
if ctx.Err() != nil {
r.trackPreviousError(ctx.Err())
return ctx.Err()
}

url := request.URL()

// r.retryAfter represents the retry after parameters calculated
// from the (response, err) tuple from the last attempt, so 'Before'
// can apply these retry after parameters prior to the next attempt.
Expand All @@ -245,6 +221,18 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
return nil
}

// At this point we've made atleast one attempt, post which the response
// body should have been fully read and closed in order for it to be safe
// to reset the request body before we reconnect, in order for us to reuse
// the same TCP connection.
if seeker, ok := request.body.(io.Seeker); ok && request.body != nil {
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err)
r.trackPreviousError(err)
return err
}
}

// if we are here, we have made attempt(s) al least once before.
if request.backoff != nil {
// TODO(tkashem) with default set to use exponential backoff
Expand All @@ -263,6 +251,7 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error {
return err
}

klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
return nil
}

Expand Down

0 comments on commit 01ab7fb

Please sign in to comment.