Skip to content

Commit

Permalink
aws/request: Fix SDK error checking when seeking readers (#2696)
Browse files Browse the repository at this point in the history
Fixes the SDK handling of seeking a reader to ensure errors are not
lost, and are bubbled up.

In several places the SDK ignored Seek errors when attempting to
determine a reader's length, or rewinding the reader for retry attempts.

Related to #2525
  • Loading branch information
jasdel authored Jul 22, 2019
1 parent 4163197 commit 5bab1ec
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 40 deletions.
12 changes: 8 additions & 4 deletions aws/client/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ func logRequest(r *request.Request) {
if !bodySeekable {
r.SetReaderBody(aws.ReadSeekCloser(r.HTTPRequest.Body))
}
// Reset the request body because dumpRequest will re-wrap the r.HTTPRequest's
// Body as a NoOpCloser and will not be reset after read by the HTTP
// client reader.
r.ResetBody()
// Reset the request body because dumpRequest will re-wrap the
// r.HTTPRequest's Body as a NoOpCloser and will not be reset after
// read by the HTTP client reader.
if err := r.Error; err != nil {
r.Config.Logger.Log(fmt.Sprintf(logReqErrMsg,
r.ClientInfo.ServiceName, r.Operation.Name, err))
return
}
}

r.Config.Logger.Log(fmt.Sprintf(logReqMsg,
Expand Down
15 changes: 10 additions & 5 deletions aws/request/offset_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ type offsetReader struct {
closed bool
}

func newOffsetReader(buf io.ReadSeeker, offset int64) *offsetReader {
func newOffsetReader(buf io.ReadSeeker, offset int64) (*offsetReader, error) {
reader := &offsetReader{}
buf.Seek(offset, sdkio.SeekStart)
_, err := buf.Seek(offset, sdkio.SeekStart)
if err != nil {
return nil, err
}

reader.buf = buf
return reader
return reader, nil
}

// Close will close the instance of the offset reader's access to
Expand Down Expand Up @@ -54,7 +57,9 @@ func (o *offsetReader) Seek(offset int64, whence int) (int64, error) {

// CloseAndCopy will return a new offsetReader with a copy of the old buffer
// and close the old buffer.
func (o *offsetReader) CloseAndCopy(offset int64) *offsetReader {
o.Close()
func (o *offsetReader) CloseAndCopy(offset int64) (*offsetReader, error) {
if err := o.Close(); err != nil {
return nil, err
}
return newOffsetReader(o.buf, offset)
}
30 changes: 20 additions & 10 deletions aws/request/offset_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestOffsetReaderRead(t *testing.T) {
t.Errorf("expect %v, got %v", e, a)
}
if err != nil {
t.Errorf("expect nil, %v", err)
t.Fatalf("expect no error, got %v", err)
}
if e, a := buf, tempBuf; !bytes.Equal(e, a) {
t.Errorf("expect %v, got %v", e, a)
Expand All @@ -32,27 +32,30 @@ func TestOffsetReaderRead(t *testing.T) {

func TestOffsetReaderSeek(t *testing.T) {
buf := []byte("testData")
reader := newOffsetReader(bytes.NewReader(buf), 0)
reader, err := newOffsetReader(bytes.NewReader(buf), 0)
if err != nil {
t.Fatalf("expect no error, got %v", err)
}

orig, err := reader.Seek(0, sdkio.SeekCurrent)
if err != nil {
t.Errorf("expect nil, %v", err)
t.Fatalf("expect no error, got %v", err)
}
if e, a := int64(0), orig; e != a {
t.Errorf("expect %v, got %v", e, a)
}

n, err := reader.Seek(0, sdkio.SeekEnd)
if err != nil {
t.Errorf("expect nil, %v", err)
t.Fatalf("expect no error, got %v", err)
}
if e, a := int64(len(buf)), n; e != a {
t.Errorf("expect %v, got %v", e, a)
}

n, err = reader.Seek(orig, sdkio.SeekStart)
if err != nil {
t.Errorf("expect nil, %v", err)
t.Fatalf("expect no error, got %v", err)
}
if e, a := int64(0), n; e != a {
t.Errorf("expect %v, got %v", e, a)
Expand All @@ -65,7 +68,7 @@ func TestOffsetReaderClose(t *testing.T) {

err := reader.Close()
if err != nil {
t.Errorf("expect nil, %v", err)
t.Fatalf("expect no error, got %v", err)
}

tempBuf := make([]byte, len(buf))
Expand All @@ -83,7 +86,10 @@ func TestOffsetReaderCloseAndCopy(t *testing.T) {
tempBuf := make([]byte, len(buf))
reader := &offsetReader{buf: bytes.NewReader(buf)}

newReader := reader.CloseAndCopy(0)
newReader, err := reader.CloseAndCopy(0)
if err != nil {
t.Fatalf("expect no error, got %v", err)
}

n, err := reader.Read(tempBuf)
if e, a := n, 0; e != a {
Expand All @@ -98,7 +104,7 @@ func TestOffsetReaderCloseAndCopy(t *testing.T) {
t.Errorf("expect %v, got %v", e, a)
}
if err != nil {
t.Errorf("expect nil, %v", err)
t.Fatalf("expect no error, got %v", err)
}
if e, a := buf, tempBuf; !bytes.Equal(e, a) {
t.Errorf("expect %v, got %v", e, a)
Expand All @@ -110,13 +116,17 @@ func TestOffsetReaderCloseAndCopyOffset(t *testing.T) {
tempBuf := make([]byte, len(buf))
reader := &offsetReader{buf: bytes.NewReader(buf)}

newReader := reader.CloseAndCopy(4)
newReader, err := reader.CloseAndCopy(4)
if err != nil {
t.Fatalf("expect no error, got %v", err)
}

n, err := newReader.Read(tempBuf)
if e, a := n, len(buf)-4; e != a {
t.Errorf("expect %v, got %v", e, a)
}
if err != nil {
t.Errorf("expect nil, %v", err)
t.Fatalf("expect no error, got %v", err)
}

expected := []byte{'D', 'a', 't', 'a', 0, 0, 0, 0}
Expand Down
39 changes: 32 additions & 7 deletions aws/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,18 @@ func (r *Request) SetStringBody(s string) {
// SetReaderBody will set the request's body reader.
func (r *Request) SetReaderBody(reader io.ReadSeeker) {
r.Body = reader
r.BodyStart, _ = reader.Seek(0, sdkio.SeekCurrent) // Get the Bodies current offset.

if aws.IsReaderSeekable(reader) {
var err error
// Get the Bodies current offset so retries will start from the same
// initial position.
r.BodyStart, err = reader.Seek(0, sdkio.SeekCurrent)
if err != nil {
r.Error = awserr.New(ErrCodeSerialization,
"failed to determine start of request body", err)
return
}
}
r.ResetBody()
}

Expand Down Expand Up @@ -393,12 +404,16 @@ func (r *Request) Sign() error {
return r.Error
}

func (r *Request) getNextRequestBody() (io.ReadCloser, error) {
func (r *Request) getNextRequestBody() (body io.ReadCloser, err error) {
if r.safeBody != nil {
r.safeBody.Close()
}

r.safeBody = newOffsetReader(r.Body, r.BodyStart)
r.safeBody, err = newOffsetReader(r.Body, r.BodyStart)
if err != nil {
return nil, awserr.New(ErrCodeSerialization,
"failed to get next request body reader", err)
}

// Go 1.8 tightened and clarified the rules code needs to use when building
// requests with the http package. Go 1.8 removed the automatic detection
Expand All @@ -415,10 +430,10 @@ func (r *Request) getNextRequestBody() (io.ReadCloser, error) {
// Related golang/go#18257
l, err := aws.SeekerLen(r.Body)
if err != nil {
return nil, awserr.New(ErrCodeSerialization, "failed to compute request body size", err)
return nil, awserr.New(ErrCodeSerialization,
"failed to compute request body size", err)
}

var body io.ReadCloser
if l == 0 {
body = NoBody
} else if l > 0 {
Expand Down Expand Up @@ -495,13 +510,16 @@ func (r *Request) Send() error {
return r.Error
}

r.prepareRetry()
if err := r.prepareRetry(); err != nil {
r.Error = err
return err
}
continue
}
}
}

func (r *Request) prepareRetry() {
func (r *Request) prepareRetry() error {
if r.Config.LogLevel.Matches(aws.LogDebugWithRequestRetries) {
r.Config.Logger.Log(fmt.Sprintf("DEBUG: Retrying Request %s/%s, attempt %d",
r.ClientInfo.ServiceName, r.Operation.Name, r.RetryCount))
Expand All @@ -512,12 +530,19 @@ func (r *Request) prepareRetry() {
// the request's body even though the Client's Do returned.
r.HTTPRequest = copyHTTPRequest(r.HTTPRequest, nil)
r.ResetBody()
if err := r.Error; err != nil {
return awserr.New(ErrCodeSerialization,
"failed to prepare body for retry", err)

}

// Closing response body to ensure that no response body is leaked
// between retry attempts.
if r.HTTPResponse != nil && r.HTTPResponse.Body != nil {
r.HTTPResponse.Body.Close()
}

return nil
}

func (r *Request) sendRequest() (sendErr error) {
Expand Down
5 changes: 4 additions & 1 deletion aws/request/request_1_8.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package request

import (
"net/http"

"github.com/aws/aws-sdk-go/aws/awserr"
)

// NoBody is a http.NoBody reader instructing Go HTTP client to not include
Expand All @@ -24,7 +26,8 @@ var NoBody = http.NoBody
func (r *Request) ResetBody() {
body, err := r.getNextRequestBody()
if err != nil {
r.Error = err
r.Error = awserr.New(ErrCodeSerialization,
"failed to reset request body", err)
return
}

Expand Down
6 changes: 3 additions & 3 deletions aws/request/request_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ func newRequest(t *testing.T, url string) *http.Request {
return r
}

func TestShouldRetryCancel_nil(t *testing.T) {
func TestShouldRetryError_nil(t *testing.T) {
if shouldRetryError(nil) != true {
t.Error("shouldRetryError(nil) should return true")
}
}

func TestShouldRetryCancel_timeout(t *testing.T) {
func TestShouldRetryError_timeout(t *testing.T) {

tr := &http.Transport{}
defer tr.CloseIdleConnections()
Expand All @@ -50,7 +50,7 @@ func TestShouldRetryCancel_timeout(t *testing.T) {
}
}

func TestShouldRetryCancel_cancelled(t *testing.T) {
func TestShouldRetryError_cancelled(t *testing.T) {
tr := &http.Transport{}
defer tr.CloseIdleConnections()
cli := http.Client{
Expand Down
37 changes: 37 additions & 0 deletions aws/request/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,43 @@ func TestRequestNoConnection(t *testing.T) {
}
}

func TestRequestBodySeekFails(t *testing.T) {
s := awstesting.NewClient()
s.Handlers.Validate.Clear()
s.Handlers.Build.Clear()

out := &testData{}
r := s.NewRequest(&request.Operation{Name: "Operation"}, nil, out)
r.SetReaderBody(&stubSeekFail{
Err: fmt.Errorf("failed to seek reader"),
})
err := r.Send()
if err == nil {
t.Fatal("expect error, but got none")
}
t.Log("Error:", err)

aerr := err.(awserr.Error)
if e, a := request.ErrCodeSerialization, aerr.Code(); e != a {
t.Errorf("expect %v error code, got %v", e, a)
}

}

type stubSeekFail struct {
Err error
}

func (f *stubSeekFail) Read(b []byte) (int, error) {
return len(b), nil
}
func (f *stubSeekFail) ReadAt(b []byte, offset int64) (int, error) {
return len(b), nil
}
func (f *stubSeekFail) Seek(offset int64, mode int) (int64, error) {
return 0, f.Err
}

func getFreePort() (int, error) {
l, err := net.Listen("tcp", ":0")
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions aws/signer/v4/v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,11 @@ func (ctx *signingCtx) buildBodyDigest() error {
if !aws.IsReaderSeekable(ctx.Body) {
return fmt.Errorf("cannot use unseekable request body %T, for signed request with body", ctx.Body)
}
hash = hex.EncodeToString(makeSha256Reader(ctx.Body))
hashBytes, err := makeSha256Reader(ctx.Body)
if err != nil {
return err
}
hash = hex.EncodeToString(hashBytes)
}

if includeSHA256Header {
Expand Down Expand Up @@ -734,10 +738,16 @@ func makeSha256(data []byte) []byte {
return hash.Sum(nil)
}

func makeSha256Reader(reader io.ReadSeeker) []byte {
func makeSha256Reader(reader io.ReadSeeker) (hashBytes []byte, err error) {
hash := sha256.New()
start, _ := reader.Seek(0, sdkio.SeekCurrent)
defer reader.Seek(start, sdkio.SeekStart)
start, err := reader.Seek(0, sdkio.SeekCurrent)
if err != nil {
return nil, err
}
defer func() {
// ensure error is return if unable to seek back to start of payload.
_, err = reader.Seek(start, sdkio.SeekStart)
}()

// Use CopyN to avoid allocating the 32KB buffer in io.Copy for bodies
// smaller than 32KB. Fall back to io.Copy if we fail to determine the size.
Expand All @@ -748,7 +758,7 @@ func makeSha256Reader(reader io.ReadSeeker) []byte {
io.CopyN(hash, reader, size)
}

return hash.Sum(nil)
return hash.Sum(nil), nil
}

const doubleSpace = " "
Expand Down
14 changes: 14 additions & 0 deletions example/service/s3/loggingUploadObjectReadBehavior/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Example

This example shows how you could wrap the reader of an file being
uploaded to Amazon S3 with a logger that will log the usage of the
reader, and print call stacks when the reader's Read, Seek, or ReadAt
methods encounter an error.

# Usage

This bucket uses the bucket name, key, and local file name passed to upload the local file to S3 as the key into the bucket.

```sh
AWS_REGION=us-west-2 AWS_PROFILE=default go run . "mybucket" "10MB.file" ./10MB.file
```
Loading

0 comments on commit 5bab1ec

Please sign in to comment.