diff --git a/README.md b/README.md index 19fe62fbf38..c3894c67e0f 100644 --- a/README.md +++ b/README.md @@ -431,16 +431,17 @@ Options: The `handler` parameter is defined as follow: -* `onConnect(resume)`, invoked when request is dispatched on socket. - * `resume(err): Void`, resume `onData` after returning `false` or abort by passing an `Error` through `err`. +* `onInit(abort)`, invoked when request is enqueued. + * `abort(err): Void`, abort with `err`. * `onUpgrade(statusCode, headers, socket): Void`, invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. * `statusCode: Number` * `headers: Array|Null` * `socket: Duplex` -* `onHeaders(statusCode, headers): Void`, invoked when statusCode and headers have been received. +* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. * `statusCode: Number` * `headers: Array|Null`, an array of key-value pairs. Keys are not automatically lowercased. + * `resume(): Void`, resume `onData` after returning `false`. * `onData(chunk): Boolean`, invoked when response payload data is received. * `chunk: Buffer` * `onComplete(trailers): Void`, invoked when response payload and trailers have been received and the request has completed. diff --git a/benchmarks/index.js b/benchmarks/index.js index 217a380d035..540950d42b7 100644 --- a/benchmarks/index.js +++ b/benchmarks/index.js @@ -149,11 +149,11 @@ class NoopRequest { this.deferred = deferred } - onConnect (resume) { + onInit (resume) { } - onHeaders (statusCode, headers) { + onHeaders (statusCode, headers, resume) { } @@ -171,11 +171,11 @@ class SimpleRequest { this.dst = dst } - onConnect (resume) { - this.dst.on('drain', resume) + onInit (resume) { } - onHeaders (statusCode, headers) { + onHeaders (statusCode, headers, resume) { + this.dst.on('drain', resume) } onData (chunk) { diff --git a/lib/client-connect.js b/lib/client-connect.js index 949508c5b0b..51a0fe5e94d 100644 --- a/lib/client-connect.js +++ b/lib/client-connect.js @@ -18,7 +18,7 @@ class ConnectHandler extends AsyncResource { this.callback = callback } - onConnect () { + onInit (abort) { } onUpgrade (statusCode, headers, socket) { diff --git a/lib/client-pipeline.js b/lib/client-pipeline.js index 11139cbae20..ddf5f0ef0ac 100644 --- a/lib/client-pipeline.js +++ b/lib/client-pipeline.js @@ -80,7 +80,7 @@ class PipelineHandler extends AsyncResource { this.opaque = opts.opaque || null this.handler = handler - this.resume = null + this.abort = null this.req = new PipelineRequest() @@ -104,15 +104,14 @@ class PipelineHandler extends AsyncResource { } }, destroy: (err, callback) => { - const { body, req, res, ret, resume } = this + const { body, req, res, ret, abort } = this if (!err && !ret._readableState.endEmitted) { err = new RequestAbortedError() } - if (resume && err) { - this.resume = null - resume(err) + if (err) { + abort(err) } util.destroy(body, err) @@ -131,18 +130,18 @@ class PipelineHandler extends AsyncResource { this.res = null } - onConnect (resume) { - this.resume = resume + onInit (abort) { + this.abort = abort } - onHeaders (statusCode, headers) { + onHeaders (statusCode, headers, resume) { const { opaque, handler } = this if (statusCode < 200) { return } - this.res = new PipelineResponse(this.resume) + this.res = new PipelineResponse(resume) let body try { diff --git a/lib/client-request.js b/lib/client-request.js index 41854c7880a..bc2392239e1 100644 --- a/lib/client-request.js +++ b/lib/client-request.js @@ -8,16 +8,12 @@ const { const util = require('./util') const { AsyncResource } = require('async_hooks') -const kResume = Symbol('resume') +const kAbort = Symbol('abort') class RequestResponse extends Readable { - constructor (resume) { - super({ autoDestroy: true }) - this[kResume] = resume - } - - _read () { - this[kResume]() + constructor (resume, abort) { + super({ autoDestroy: true, read: resume }) + this[kAbort] = abort } _destroy (err, callback) { @@ -26,7 +22,7 @@ class RequestResponse extends Readable { } if (err) { - this[kResume](err) + this[kAbort](err) } callback(err) @@ -48,22 +44,22 @@ class RequestHandler extends AsyncResource { this.opaque = opts.opaque || null this.callback = callback this.res = null - this.resume = null + this.abort = null } - onConnect (resume) { - this.resume = resume + onInit (abort) { + this.abort = abort } - onHeaders (statusCode, headers) { - const { callback, opaque, resume } = this + onHeaders (statusCode, headers, resume) { + const { callback, opaque } = this if (statusCode < 200) { return } this.resume = null - const body = new RequestResponse(resume) + const body = new RequestResponse(resume, this.abort) this.callback = null this.res = body diff --git a/lib/client-stream.js b/lib/client-stream.js index c1bcf48c909..e58e63ea573 100644 --- a/lib/client-stream.js +++ b/lib/client-stream.js @@ -28,15 +28,15 @@ class StreamHandler extends AsyncResource { this.factory = factory this.callback = callback this.res = null - this.resume = null + this.abort = null this.trailers = null } - onConnect (resume) { - this.resume = resume + onInit (abort) { + this.abort = abort } - onHeaders (statusCode, headers) { + onHeaders (statusCode, headers, resume) { const { factory, opaque } = this if (statusCode < 200) { @@ -59,10 +59,10 @@ class StreamHandler extends AsyncResource { throw new InvalidReturnValueError('expected Writable') } - res.on('drain', this.resume) + res.on('drain', resume) // TODO: Avoid finished. It registers an unecessary amount of listeners. finished(res, { readable: false }, (err) => { - const { callback, res, opaque, trailers, resume } = this + const { callback, res, opaque, trailers, abort } = this this.res = null if (err || !res.readable) { @@ -70,8 +70,8 @@ class StreamHandler extends AsyncResource { } if (err) { - this.resume = null - resume(err) + this.abort = null + abort(err) } this.callback = null diff --git a/lib/client-upgrade.js b/lib/client-upgrade.js index cc5df1db7c8..0867b048d70 100644 --- a/lib/client-upgrade.js +++ b/lib/client-upgrade.js @@ -18,7 +18,7 @@ class UpgradeHandler extends AsyncResource { this.callback = callback } - onConnect () { + onInit (abort) { } onUpgrade (statusCode, headers, socket) { diff --git a/lib/request.js b/lib/request.js index b914eb2f3f7..fb9e5660bad 100644 --- a/lib/request.js +++ b/lib/request.js @@ -188,6 +188,10 @@ class Request { : null this[kResume] = null + + this[kHandler].onInit((err) => { + this.onError(err) + }) } get expectsPayload () { @@ -234,17 +238,6 @@ class Request { onConnect (resume) { this[kResume] = resume - try { - this[kHandler].onConnect((err) => { - if (err) { - this.onError(err) - } else if (this[kResume]) { - this[kResume]() - } - }) - } catch (err) { - this.onError(err) - } } onUpgrade (statusCode, headers, socket) { @@ -278,7 +271,7 @@ class Request { } try { - this[kHandler].onHeaders(statusCode, headers) + this[kHandler].onHeaders(statusCode, headers, this[kResume]) } catch (err) { this.onError(err) } diff --git a/test/client-dispatch.js b/test/client-dispatch.js index 564fd1d9889..a3ab01e9aa1 100644 --- a/test/client-dispatch.js +++ b/test/client-dispatch.js @@ -49,7 +49,7 @@ test('basic dispatch get', (t) => { method: 'GET', headers: reqHeaders }, { - onConnect () { + onInit () { }, onHeaders (statusCode, headers) { t.strictEqual(statusCode, 200) @@ -101,7 +101,7 @@ test('trailers dispatch get', (t) => { method: 'GET', headers: reqHeaders }, { - onConnect () { + onInit () { }, onHeaders (statusCode, headers) { t.strictEqual(statusCode, 200) @@ -146,7 +146,7 @@ test('dispatch onHeaders error', (t) => { path: '/', method: 'GET' }, { - onConnect () { + onInit () { }, onHeaders (statusCode, headers) { throw _err @@ -181,7 +181,7 @@ test('dispatch onComplete error', (t) => { path: '/', method: 'GET' }, { - onConnect () { + onInit () { }, onHeaders (statusCode, headers) { t.pass() @@ -216,7 +216,7 @@ test('dispatch onData error', (t) => { path: '/', method: 'GET' }, { - onConnect () { + onInit () { }, onHeaders (statusCode, headers) { t.pass() diff --git a/test/pool.js b/test/pool.js index ecf3c2b7e49..86635db5ef6 100644 --- a/test/pool.js +++ b/test/pool.js @@ -414,8 +414,7 @@ test('pool dispatch', (t) => { path: '/', method: 'GET' }, { - onConnect () { - + onInit () { }, onHeaders (statusCode, headers) { t.strictEqual(statusCode, 200)