From ded60f65772371b52d575e602a82551905bbeda8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 13 Apr 2023 09:38:02 +0200 Subject: [PATCH] feat: allow overriding hwm (#2057) --- lib/api/api-request.js | 11 ++++++++--- lib/api/readable.js | 9 +++++++-- test/client-request.js | 23 +++++++++++++++++++++++ test/readable.test.js | 2 +- types/dispatcher.d.ts | 2 ++ 5 files changed, 41 insertions(+), 6 deletions(-) diff --git a/lib/api/api-request.js b/lib/api/api-request.js index b4674878d2e..bbb80cc96e7 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -16,13 +16,17 @@ class RequestHandler extends AsyncResource { throw new InvalidArgumentError('invalid opts') } - const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts + const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts try { if (typeof callback !== 'function') { throw new InvalidArgumentError('invalid callback') } + if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) { + throw new InvalidArgumentError('invalid highWaterMark') + } + if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') } @@ -53,6 +57,7 @@ class RequestHandler extends AsyncResource { this.context = null this.onInfo = onInfo || null this.throwOnError = throwOnError + this.highWaterMark = highWaterMark if (util.isStream(body)) { body.on('error', (err) => { @@ -73,7 +78,7 @@ class RequestHandler extends AsyncResource { } onHeaders (statusCode, rawHeaders, resume, statusMessage) { - const { callback, opaque, abort, context } = this + const { callback, opaque, abort, context, highWaterMark } = this if (statusCode < 200) { if (this.onInfo) { @@ -85,7 +90,7 @@ class RequestHandler extends AsyncResource { const parsedHeaders = util.parseHeaders(rawHeaders) const contentType = parsedHeaders['content-type'] - const body = new Readable(resume, abort, contentType) + const body = new Readable({ resume, abort, contentType, highWaterMark }) this.callback = null this.res = body diff --git a/lib/api/readable.js b/lib/api/readable.js index a184e8eb51b..398a75ba8bb 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -17,11 +17,16 @@ const kAbort = Symbol('abort') const kContentType = Symbol('kContentType') module.exports = class BodyReadable extends Readable { - constructor (resume, abort, contentType = '') { + constructor ({ + resume, + abort, + contentType = '', + highWaterMark = 64 * 1024 // Same as nodejs fs streams. + }) { super({ autoDestroy: true, read: resume, - highWaterMark: 64 * 1024 // Same as nodejs fs streams. + highWaterMark }) this._readableState.dataEmitted = false diff --git a/test/client-request.js b/test/client-request.js index 2d73e9892ee..626db41187a 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -75,6 +75,29 @@ test('request dump with abort signal', (t) => { }) }) +test('request hwm', (t) => { + t.plan(2) + const server = createServer((req, res) => { + res.write('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + client.request({ + path: '/', + method: 'GET', + highWaterMark: 1000 + }, (err, { body }) => { + t.error(err) + t.same(body.readableHighWaterMark, 1000) + body.dump() + }) + }) +}) + test('request abort before headers', (t) => { t.plan(6) diff --git a/test/readable.test.js b/test/readable.test.js index da329939896..3f4f7939f94 100644 --- a/test/readable.test.js +++ b/test/readable.test.js @@ -8,7 +8,7 @@ test('avoid body reordering', async function (t) { } function abort () { } - const r = new Readable(resume, abort) + const r = new Readable({ resume, abort }) r.push(Buffer.from('hello')) diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index c6b0c8875eb..412520386f3 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -142,6 +142,8 @@ declare namespace Dispatcher { onInfo?: (info: { statusCode: number, headers: Record }) => void; /** Default: `null` */ responseHeader?: 'raw' | null; + /** Default: `64 KiB` */ + highWaterMark?: number; } export interface PipelineOptions extends RequestOptions { /** `true` if the `handler` will return an object stream. Default: `false` */