Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 20, 2020
1 parent adb4fae commit c9395eb
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 61 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,16 +431,18 @@ Options:
The `handler` parameter is defined as follow:
* `onConnect(abort)`, invoked before request is dispatched on socket.
* `abort(): Void`, abort request.
* `onUpgrade(statusCode, headers, socket): Void`, invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method.
* `statusCode: Number`
* `headers: Array|Null`
* `socket: Duplex`
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received.
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received.
May be invoked multiple times due to 1xx informational headers.
* `statusCode: Number`
* `headers: Array|Null`, an array of key-value pairs. Keys are not automatically lowercased.
* `resume(): Void`, resume `onData` after returning `false`.
* `onData(chunk): Null|Boolean`, invoked when response payload data is received.
* `onData(chunk): Boolean`, invoked when response payload data is received.
* `chunk: Buffer`
* `onComplete(trailers): Void`, invoked when response payload and trailers have been received and the request has completed.
* `trailers: Array|Null`
Expand Down
9 changes: 8 additions & 1 deletion benchmarks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ class NoopRequest {
this.deferred = deferred
}

onHeaders () {
onConnect (abort) {

}

onHeaders (statusCode, headers, resume) {

}

Expand All @@ -167,6 +171,9 @@ class SimpleRequest {
this.dst = dst
}

onConnect (abort) {
}

onHeaders (statusCode, headers, resume) {
this.dst.on('drain', resume)
}
Expand Down
3 changes: 3 additions & 0 deletions lib/client-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class ConnectHandler extends AsyncResource {
this.callback = callback
}

onConnect (abort) {
}

onUpgrade (statusCode, headers, socket) {
const { callback, opaque } = this

Expand Down
37 changes: 21 additions & 16 deletions lib/client-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ class PipelineRequest extends Readable {

class PipelineResponse extends Readable {
constructor (resume) {
super({ autoDestroy: true, read: resume })
super({ autoDestroy: true })
this[kResume] = resume
}

_destroy (err, callback) {
this._read()
_read () {
this[kResume]()
}

_destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}
Expand All @@ -77,6 +80,7 @@ class PipelineHandler extends AsyncResource {

this.opaque = opts.opaque || null
this.handler = handler
this.abort = null

this.req = new PipelineRequest()

Expand All @@ -100,12 +104,16 @@ class PipelineHandler extends AsyncResource {
}
},
destroy: (err, callback) => {
const { body, req, res, ret } = this
const { body, req, res, ret, abort } = this

if (!err && !ret._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (abort && err) {
abort()
}

util.destroy(body, err)
util.destroy(req, err)
util.destroy(res, err)
Expand All @@ -122,6 +130,15 @@ class PipelineHandler extends AsyncResource {
this.res = null
}

onConnect (abort) {
const { ret } = this
if (ret.destroyed) {
abort()
} else {
this.abort = abort
}
}

onHeaders (statusCode, headers, resume) {
const { opaque, handler } = this

Expand Down Expand Up @@ -180,29 +197,17 @@ class PipelineHandler extends AsyncResource {

onData (chunk) {
const { res } = this

if (res._readableState.destroyed) {
return
}

return res.push(chunk)
}

onComplete (trailers) {
const { res } = this

if (res._readableState.destroyed) {
return
}

res.push(null)
}

onError (err) {
const { ret } = this

this.handler = null

util.destroy(ret, err)
}
}
Expand Down
30 changes: 15 additions & 15 deletions lib/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,23 @@ const {
const util = require('./util')
const { AsyncResource } = require('async_hooks')

const kAbort = Symbol('abort')

class RequestResponse extends Readable {
constructor (resume) {
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })
this[kAbort] = abort
}

_destroy (err, callback) {
this._read()

if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (err) {
this[kAbort]()
}

callback(err)
}
}
Expand All @@ -39,16 +44,21 @@ class RequestHandler extends AsyncResource {
this.opaque = opts.opaque || null
this.callback = callback
this.res = null
this.abort = null
}

onConnect (abort) {
this.abort = abort
}

onHeaders (statusCode, headers, resume) {
const { callback, opaque } = this
const { callback, opaque, abort } = this

if (statusCode < 200) {
return
}

const body = new RequestResponse(resume)
const body = new RequestResponse(resume, abort)

this.callback = null
this.res = body
Expand All @@ -63,21 +73,11 @@ class RequestHandler extends AsyncResource {

onData (chunk) {
const { res } = this

if (res._readableState.destroyed) {
return
}

return res.push(chunk)
}

onComplete (trailers) {
const { res } = this

if (res._readableState.destroyed) {
return
}

res.push(null)
}

Expand Down
24 changes: 11 additions & 13 deletions lib/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ class StreamHandler extends AsyncResource {
this.factory = factory
this.callback = callback
this.res = null
this.abort = null
this.trailers = null
}

onConnect (abort) {
this.abort = abort
}

onHeaders (statusCode, headers, resume) {
const { factory, opaque } = this

Expand All @@ -57,13 +62,17 @@ class StreamHandler extends AsyncResource {
res.on('drain', resume)
// TODO: Avoid finished. It registers an unecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers } = this
const { callback, res, opaque, trailers, abort } = this

this.res = null
if (err || !res.readable) {
util.destroy(res, err)
}

if (err) {
abort()
}

this.callback = null
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
})
Expand All @@ -73,23 +82,12 @@ class StreamHandler extends AsyncResource {

onData (chunk) {
const { res } = this

if (util.isDestroyed(res)) {
return
}

return res.write(chunk)
}

onComplete (trailers) {
const { res } = this

if (util.isDestroyed(res)) {
return
}

this.trailers = trailers ? util.parseHeaders(trailers) : {}

res.end()
}

Expand All @@ -101,7 +99,7 @@ class StreamHandler extends AsyncResource {
if (res) {
this.res = null
util.destroy(res, err)
} else {
} else if (callback) {
this.callback = null
this.runInAsyncScope(callback, null, err, { opaque })
}
Expand Down
3 changes: 3 additions & 0 deletions lib/client-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class UpgradeHandler extends AsyncResource {
this.callback = callback
}

onConnect (abort) {
}

onUpgrade (statusCode, headers, socket) {
const { callback, opaque } = this

Expand Down
18 changes: 13 additions & 5 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class Client extends EventEmitter {
this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
this[kDrained] = false
this[kMaxAbortedPayload] = maxAbortedPayload || 1048576
this[kResume] = () => {
resume(this)
}

// kQueue is built up of 3 sections separated by
// the kRunningIdx and kPendingIdx indices.
Expand Down Expand Up @@ -932,11 +935,10 @@ function _resume (client) {
return
}

try {
write(client, request)
if (write(client, request)) {
client[kPendingIdx]++
} catch (err) {
request.onError(err)
} else {
client[kQueue].splice(client[kPendingIdx], 1)
}
}
}
Expand Down Expand Up @@ -965,7 +967,11 @@ function write (client, request) {
}

if (request.contentLength !== null && request.contentLength !== contentLength) {
throw new ContentLengthMismatchError()
request.onError(new ContentLengthMismatchError())
}

if (!request.onConnect(client[kSocket])) {
return false
}

// TODO: Expect: 100-continue
Expand Down Expand Up @@ -1114,6 +1120,8 @@ function write (client, request) {

client[kWriting] = true
}

return true
}

module.exports = Client
Loading

0 comments on commit c9395eb

Please sign in to comment.