Skip to content

Commit

Permalink
fix: undici stream throwOnError (#1995)
Browse files Browse the repository at this point in the history
* fix: undici stream throwOnError

* fix: revert move getResolveErrorBodyCallback to api-request and pr comments

* fix: handle error event
  • Loading branch information
dancastillo committed Mar 10, 2023
1 parent 3606c35 commit a0afde8
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 6 deletions.
38 changes: 33 additions & 5 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
'use strict'

const { finished } = require('stream')
const { finished, PassThrough } = require('stream')
const {
InvalidArgumentError,
InvalidReturnValueError,
RequestAbortedError
RequestAbortedError,
ResponseStatusCodeError
} = require('../core/errors')
const util = require('../core/util')
const { AsyncResource } = require('async_hooks')
Expand All @@ -16,7 +17,7 @@ class StreamHandler extends AsyncResource {
throw new InvalidArgumentError('invalid opts')
}

const { signal, method, opaque, body, onInfo, responseHeaders } = opts
const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts

try {
if (typeof callback !== 'function') {
Expand Down Expand Up @@ -57,6 +58,7 @@ class StreamHandler extends AsyncResource {
this.trailers = null
this.body = body
this.onInfo = onInfo || null
this.throwOnError = throwOnError || false

if (util.isStream(body)) {
body.on('error', (err) => {
Expand All @@ -76,8 +78,8 @@ class StreamHandler extends AsyncResource {
this.context = context
}

onHeaders (statusCode, rawHeaders, resume) {
const { factory, opaque, context } = this
onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { factory, opaque, context, callback } = this

if (statusCode < 200) {
if (this.onInfo) {
Expand All @@ -96,6 +98,32 @@ class StreamHandler extends AsyncResource {
context
})

if (this.throwOnError && statusCode >= 400) {
const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
const chunks = []
const pt = new PassThrough()
pt
.on('data', (chunk) => chunks.push(chunk))
.on('end', () => {
const payload = Buffer.concat(chunks).toString('utf8')
this.runInAsyncScope(
callback,
null,
new ResponseStatusCodeError(
`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`,
statusCode,
headers,
payload
)
)
})
.on('error', (err) => {
this.onError(err)
})
this.res = pt
return
}

if (
!res ||
typeof res.write !== 'function' ||
Expand Down
2 changes: 1 addition & 1 deletion test/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ test('async hooks client is destroyed', (t) => {
const client = new Client(`http:https://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))

client.request({ path: '/', method: 'GET' }, (err, { body }) => {
client.request({ path: '/', method: 'GET', throwOnError: true }, (err, { body }) => {
t.error(err)
body.resume()
body.on('error', (err) => {
Expand Down
59 changes: 59 additions & 0 deletions test/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -785,4 +785,63 @@ test('stream legacy needDrain', (t) => {
t.pass()
})
})

test('steam throwOnError', (t) => {
t.plan(2)

const errStatusCode = 500
const errMessage = 'Internal Server Error'

const server = createServer((req, res) => {
res.writeHead(errStatusCode, { 'Content-Type': 'text/plain' })
res.end(errMessage)
})
t.teardown(server.close.bind(server))

server.listen(0, async () => {
const client = new Client(`http:https://localhost:${server.address().port}`)
t.teardown(client.close.bind(client))

client.stream({
path: '/',
method: 'GET',
throwOnError: true,
opaque: new PassThrough()
}, ({ opaque: pt }) => {
pt.on('data', () => {
t.fail()
})
return pt
}, (e) => {
t.equal(e.status, errStatusCode)
t.equal(e.body, errMessage)
t.end()
})
})
})

test('steam throwOnError=true, error on stream', (t) => {
t.plan(1)

const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))

server.listen(0, async () => {
const client = new Client(`http:https://localhost:${server.address().port}`)
t.teardown(client.close.bind(client))

client.stream({
path: '/',
method: 'GET',
throwOnError: true,
opaque: new PassThrough()
}, () => {
throw new Error('asd')
}, (e) => {
t.equal(e.message, 'asd')
})
})
})
})

0 comments on commit a0afde8

Please sign in to comment.