Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 20, 2020
1 parent 2920be7 commit ea4dad6
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 61 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,16 +431,17 @@ Options:
The `handler` parameter is defined as follow:
* `onConnect(resume)`, invoked when request is dispatched on socket.
* `resume(err): Void`, resume `onData` after returning `false` or abort by passing an `Error` through `err`.
* `onInit(abort)`, invoked when request is enqueued.
* `abort(err): Void`, abort with `err`.
* `onUpgrade(statusCode, headers, socket): Void`, invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method.
* `statusCode: Number`
* `headers: Array|Null`
* `socket: Duplex`
* `onHeaders(statusCode, headers): Void`, invoked when statusCode and headers have been received.
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received.
May be invoked multiple times due to 1xx informational headers.
* `statusCode: Number`
* `headers: Array|Null`, an array of key-value pairs. Keys are not automatically lowercased.
* `resume(): Void`, resume `onData` after returning `false`.
* `onData(chunk): Boolean`, invoked when response payload data is received.
* `chunk: Buffer`
* `onComplete(trailers): Void`, invoked when response payload and trailers have been received and the request has completed.
Expand Down
10 changes: 5 additions & 5 deletions benchmarks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ class NoopRequest {
this.deferred = deferred
}

onConnect (resume) {
onInit (resume) {

}

onHeaders (statusCode, headers) {
onHeaders (statusCode, headers, resume) {

}

Expand All @@ -171,11 +171,11 @@ class SimpleRequest {
this.dst = dst
}

onConnect (resume) {
this.dst.on('drain', resume)
onInit (resume) {
}

onHeaders (statusCode, headers) {
onHeaders (statusCode, headers, resume) {
this.dst.on('drain', resume)
}

onData (chunk) {
Expand Down
2 changes: 1 addition & 1 deletion lib/client-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ConnectHandler extends AsyncResource {
this.callback = callback
}

onConnect () {
onInit (abort) {
}

onUpgrade (statusCode, headers, socket) {
Expand Down
17 changes: 8 additions & 9 deletions lib/client-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class PipelineHandler extends AsyncResource {

this.opaque = opts.opaque || null
this.handler = handler
this.resume = null
this.abort = null

this.req = new PipelineRequest()

Expand All @@ -104,15 +104,14 @@ class PipelineHandler extends AsyncResource {
}
},
destroy: (err, callback) => {
const { body, req, res, ret, resume } = this
const { body, req, res, ret, abort } = this

if (!err && !ret._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (resume && err) {
this.resume = null
resume(err)
if (err) {
abort(err)
}

util.destroy(body, err)
Expand All @@ -131,18 +130,18 @@ class PipelineHandler extends AsyncResource {
this.res = null
}

onConnect (resume) {
this.resume = resume
onInit (abort) {
this.abort = abort
}

onHeaders (statusCode, headers) {
onHeaders (statusCode, headers, resume) {
const { opaque, handler } = this

if (statusCode < 200) {
return
}

this.res = new PipelineResponse(this.resume)
this.res = new PipelineResponse(resume)

let body
try {
Expand Down
26 changes: 11 additions & 15 deletions lib/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ const {
const util = require('./util')
const { AsyncResource } = require('async_hooks')

const kResume = Symbol('resume')
const kAbort = Symbol('abort')

class RequestResponse extends Readable {
constructor (resume) {
super({ autoDestroy: true })
this[kResume] = resume
}

_read () {
this[kResume]()
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })
this[kAbort] = abort
}

_destroy (err, callback) {
Expand All @@ -26,7 +22,7 @@ class RequestResponse extends Readable {
}

if (err) {
this[kResume](err)
this[kAbort](err)
}

callback(err)
Expand All @@ -48,22 +44,22 @@ class RequestHandler extends AsyncResource {
this.opaque = opts.opaque || null
this.callback = callback
this.res = null
this.resume = null
this.abort = null
}

onConnect (resume) {
this.resume = resume
onInit (abort) {
this.abort = abort
}

onHeaders (statusCode, headers) {
const { callback, opaque, resume } = this
onHeaders (statusCode, headers, resume) {
const { callback, opaque } = this

if (statusCode < 200) {
return
}

this.resume = null
const body = new RequestResponse(resume)
const body = new RequestResponse(resume, this.abort)

this.callback = null
this.res = body
Expand Down
16 changes: 8 additions & 8 deletions lib/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ class StreamHandler extends AsyncResource {
this.factory = factory
this.callback = callback
this.res = null
this.resume = null
this.abort = null
this.trailers = null
}

onConnect (resume) {
this.resume = resume
onInit (abort) {
this.abort = abort
}

onHeaders (statusCode, headers) {
onHeaders (statusCode, headers, resume) {
const { factory, opaque } = this

if (statusCode < 200) {
Expand All @@ -59,19 +59,19 @@ class StreamHandler extends AsyncResource {
throw new InvalidReturnValueError('expected Writable')
}

res.on('drain', this.resume)
res.on('drain', resume)
// TODO: Avoid finished. It registers an unecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, resume } = this
const { callback, res, opaque, trailers, abort } = this

this.res = null
if (err || !res.readable) {
util.destroy(res, err)
}

if (err) {
this.resume = null
resume(err)
this.abort = null
abort(err)
}

this.callback = null
Expand Down
2 changes: 1 addition & 1 deletion lib/client-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class UpgradeHandler extends AsyncResource {
this.callback = callback
}

onConnect () {
onInit (abort) {
}

onUpgrade (statusCode, headers, socket) {
Expand Down
17 changes: 5 additions & 12 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ class Request {
: null

this[kResume] = null

this[kHandler].onInit((err) => {
this.onError(err)
})
}

get expectsPayload () {
Expand Down Expand Up @@ -234,17 +238,6 @@ class Request {

onConnect (resume) {
this[kResume] = resume
try {
this[kHandler].onConnect((err) => {
if (err) {
this.onError(err)
} else if (this[kResume]) {
this[kResume]()
}
})
} catch (err) {
this.onError(err)
}
}

onUpgrade (statusCode, headers, socket) {
Expand Down Expand Up @@ -278,7 +271,7 @@ class Request {
}

try {
this[kHandler].onHeaders(statusCode, headers)
this[kHandler].onHeaders(statusCode, headers, this[kResume])
} catch (err) {
this.onError(err)
}
Expand Down
10 changes: 5 additions & 5 deletions test/client-dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ test('basic dispatch get', (t) => {
method: 'GET',
headers: reqHeaders
}, {
onConnect () {
onInit () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
Expand Down Expand Up @@ -101,7 +101,7 @@ test('trailers dispatch get', (t) => {
method: 'GET',
headers: reqHeaders
}, {
onConnect () {
onInit () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
Expand Down Expand Up @@ -146,7 +146,7 @@ test('dispatch onHeaders error', (t) => {
path: '/',
method: 'GET'
}, {
onConnect () {
onInit () {
},
onHeaders (statusCode, headers) {
throw _err
Expand Down Expand Up @@ -181,7 +181,7 @@ test('dispatch onComplete error', (t) => {
path: '/',
method: 'GET'
}, {
onConnect () {
onInit () {
},
onHeaders (statusCode, headers) {
t.pass()
Expand Down Expand Up @@ -216,7 +216,7 @@ test('dispatch onData error', (t) => {
path: '/',
method: 'GET'
}, {
onConnect () {
onInit () {
},
onHeaders (statusCode, headers) {
t.pass()
Expand Down
3 changes: 1 addition & 2 deletions test/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,7 @@ test('pool dispatch', (t) => {
path: '/',
method: 'GET'
}, {
onConnect () {

onInit () {
},
onHeaders (statusCode, headers) {
t.strictEqual(statusCode, 200)
Expand Down

0 comments on commit ea4dad6

Please sign in to comment.