From 6eb954c664902d1e0d7cdb7fd25cba69f0c80433 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 14 Apr 2023 16:39:16 +0200 Subject: [PATCH] refactor: unify error body handling (#2060) * refactor: unify error body handling * fixup * fixup * fixup --- lib/api/api-request.js | 58 ++++++----------------- lib/api/api-stream.js | 103 ++++++++++++++++++----------------------- lib/api/util.js | 46 ++++++++++++++++++ test/client-stream.js | 2 +- 4 files changed, 108 insertions(+), 101 deletions(-) create mode 100644 lib/api/util.js diff --git a/lib/api/api-request.js b/lib/api/api-request.js index bbb80cc96e7..71d7e926b4c 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -3,10 +3,10 @@ const Readable = require('./readable') const { InvalidArgumentError, - RequestAbortedError, - ResponseStatusCodeError + RequestAbortedError } = require('../core/errors') const util = require('../core/util') +const { getResolveErrorBodyCallback } = require('./util') const { AsyncResource } = require('async_hooks') const { addSignal, removeSignal } = require('./abort-signal') @@ -78,40 +78,39 @@ class RequestHandler extends AsyncResource { } onHeaders (statusCode, rawHeaders, resume, statusMessage) { - const { callback, opaque, abort, context, highWaterMark } = this + const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this + + const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) if (statusCode < 200) { if (this.onInfo) { - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) this.onInfo({ statusCode, headers }) } return } - const parsedHeaders = util.parseHeaders(rawHeaders) + const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers const contentType = parsedHeaders['content-type'] const body = new Readable({ resume, abort, contentType, highWaterMark }) this.callback = null this.res = body - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) if (callback !== null) { if (this.throwOnError && statusCode >= 400) { this.runInAsyncScope(getResolveErrorBodyCallback, null, { callback, body, contentType, statusCode, statusMessage, headers } ) - return + } else { + this.runInAsyncScope(callback, null, null, { + statusCode, + headers, + trailers: this.trailers, + opaque, + body, + context + }) } - - this.runInAsyncScope(callback, null, null, { - statusCode, - headers, - trailers: this.trailers, - opaque, - body, - context - }) } } @@ -158,33 +157,6 @@ class RequestHandler extends AsyncResource { } } -async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) { - if (statusCode === 204 || !contentType) { - body.dump() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) - return - } - - try { - if (contentType.startsWith('application/json')) { - const payload = await body.json() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) - return - } - - if (contentType.startsWith('text/')) { - const payload = await body.text() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) - return - } - } catch (err) { - // Process in a fallback if error - } - - body.dump() - process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) -} - function request (opts, callback) { if (callback === undefined) { return new Promise((resolve, reject) => { diff --git a/lib/api/api-stream.js b/lib/api/api-stream.js index 7560a2e6505..3a8e71a5730 100644 --- a/lib/api/api-stream.js +++ b/lib/api/api-stream.js @@ -4,10 +4,10 @@ const { finished, PassThrough } = require('stream') const { InvalidArgumentError, InvalidReturnValueError, - RequestAbortedError, - ResponseStatusCodeError + RequestAbortedError } = require('../core/errors') const util = require('../core/util') +const { getResolveErrorBodyCallback } = require('./util') const { AsyncResource } = require('async_hooks') const { addSignal, removeSignal } = require('./abort-signal') @@ -79,77 +79,66 @@ class StreamHandler extends AsyncResource { } onHeaders (statusCode, rawHeaders, resume, statusMessage) { - const { factory, opaque, context, callback } = this + const { factory, opaque, context, callback, responseHeaders } = this + + const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) if (statusCode < 200) { if (this.onInfo) { - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) this.onInfo({ statusCode, headers }) } return } this.factory = null - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) - const res = this.runInAsyncScope(factory, null, { - statusCode, - headers, - opaque, - context - }) - if (this.throwOnError && statusCode >= 400) { - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) - const chunks = [] - const pt = new PassThrough() - pt - .on('data', (chunk) => chunks.push(chunk)) - .on('end', () => { - const payload = Buffer.concat(chunks).toString('utf8') - this.runInAsyncScope( - callback, - null, - new ResponseStatusCodeError( - `Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, - statusCode, - headers, - payload - ) - ) - }) - .on('error', (err) => { - this.onError(err) - }) - this.res = pt - return - } + let res - if ( - !res || - typeof res.write !== 'function' || - typeof res.end !== 'function' || - typeof res.on !== 'function' - ) { - throw new InvalidReturnValueError('expected Writable') - } + if (this.throwOnError && statusCode >= 400) { + const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers + const contentType = parsedHeaders['content-type'] + res = new PassThrough() - res.on('drain', resume) - // TODO: Avoid finished. It registers an unnecessary amount of listeners. - finished(res, { readable: false }, (err) => { - const { callback, res, opaque, trailers, abort } = this + this.callback = null + this.runInAsyncScope(getResolveErrorBodyCallback, null, + { callback, body: res, contentType, statusCode, statusMessage, headers } + ) + } else { + res = this.runInAsyncScope(factory, null, { + statusCode, + headers, + opaque, + context + }) - this.res = null - if (err || !res.readable) { - util.destroy(res, err) + if ( + !res || + typeof res.write !== 'function' || + typeof res.end !== 'function' || + typeof res.on !== 'function' + ) { + throw new InvalidReturnValueError('expected Writable') } - this.callback = null - this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) + // TODO: Avoid finished. It registers an unnecessary amount of listeners. + finished(res, { readable: false }, (err) => { + const { callback, res, opaque, trailers, abort } = this - if (err) { - abort() - } - }) + this.res = null + if (err || !res.readable) { + util.destroy(res, err) + } + + this.callback = null + this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) + + if (err) { + abort() + } + }) + } + + res.on('drain', resume) this.res = res diff --git a/lib/api/util.js b/lib/api/util.js new file mode 100644 index 00000000000..bffd70279a4 --- /dev/null +++ b/lib/api/util.js @@ -0,0 +1,46 @@ +const assert = require('assert') +const { + ResponseStatusCodeError +} = require('../core/errors') +const { toUSVString } = require('../core/util') + +async function getResolveErrorBodyCallback ({ callback, body, contentType, statusCode, statusMessage, headers }) { + assert(body) + + let chunks = [] + let limit = 0 + + for await (const chunk of body) { + chunks.push(chunk) + limit += chunk.length + if (limit > 128 * 1024) { + chunks = null + break + } + } + + if (statusCode === 204 || !contentType || !chunks) { + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) + return + } + + try { + if (contentType.startsWith('application/json')) { + const payload = JSON.parse(toUSVString(Buffer.concat(chunks))) + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) + return + } + + if (contentType.startsWith('text/')) { + const payload = toUSVString(Buffer.concat(chunks)) + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers, payload)) + return + } + } catch (err) { + // Process in a fallback if error + } + + process.nextTick(callback, new ResponseStatusCodeError(`Response status code ${statusCode}${statusMessage ? `: ${statusMessage}` : ''}`, statusCode, headers)) +} + +module.exports = { getResolveErrorBodyCallback } diff --git a/test/client-stream.js b/test/client-stream.js index e67727b74c7..a230c443b67 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -786,7 +786,7 @@ test('stream legacy needDrain', (t) => { }) }) - test('steam throwOnError', (t) => { + test('stream throwOnError', (t) => { t.plan(2) const errStatusCode = 500