Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 20, 2020
1 parent adb4fae commit d1dfee2
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 362 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ Options:
a response. Use `0` to disable it entirely.
Default: `30e3` milliseconds (30s).

- `maxAbortedPayload: Number`, the maximum number of bytes read after which an
aborted response will close the connection. Closing the connection
will error other inflight requests in the pipeline.
Default: `1048576` bytes (1MiB).

- `pipelining: Number`, the amount of concurrent requests to be sent over the
single TCP/TLS connection according to
[RFC7230](https://tools.ietf.org/html/rfc7230#section-6.3.2).
Expand Down Expand Up @@ -228,6 +223,10 @@ ee.emit('abort')

Destroying the request or response body will have the same effect.

Aborting a request while active in the pipeline can have significant
performance impact as it will immediatly destroy the socket and the
rest of the active pipeline will be retried.

<a name='stream'></a>
#### `client.stream(opts, factory(data)[, callback(err)]): Promise|Void`

Expand Down Expand Up @@ -431,16 +430,17 @@ Options:
The `handler` parameter is defined as follow:
* `onConnect(resume)`, invoked when request is dispatched on socket.
* `resume(err): Void`, resume `onData` after returning `false` or abort by passing an `Error` through `err`.
* `onUpgrade(statusCode, headers, socket): Void`, invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method.
* `statusCode: Number`
* `headers: Array|Null`
* `socket: Duplex`
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received.
* `onHeaders(statusCode, headers): Void`, invoked when statusCode and headers have been received.
May be invoked multiple times due to 1xx informational headers.
* `statusCode: Number`
* `headers: Array|Null`, an array of key-value pairs. Keys are not automatically lowercased.
* `resume(): Void`, resume `onData` after returning `false`.
* `onData(chunk): Null|Boolean`, invoked when response payload data is received.
* `onData(chunk): Boolean`, invoked when response payload data is received.
* `chunk: Buffer`
* `onComplete(trailers): Void`, invoked when response payload and trailers have been received and the request has completed.
* `trailers: Array|Null`
Expand Down
11 changes: 9 additions & 2 deletions benchmarks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ class NoopRequest {
this.deferred = deferred
}

onHeaders () {
onConnect (resume) {

}

onHeaders (statusCode, headers) {

}

Expand All @@ -167,10 +171,13 @@ class SimpleRequest {
this.dst = dst
}

onHeaders (statusCode, headers, resume) {
onConnect (resume) {
this.dst.on('drain', resume)
}

onHeaders (statusCode, headers) {
}

onData (chunk) {
return this.dst.write(chunk)
}
Expand Down
3 changes: 3 additions & 0 deletions lib/client-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class ConnectHandler extends AsyncResource {
this.callback = callback
}

onConnect () {
}

onUpgrade (statusCode, headers, socket) {
const { callback, opaque } = this

Expand Down
35 changes: 19 additions & 16 deletions lib/client-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class PipelineRequest extends Readable {

class PipelineResponse extends Readable {
constructor (resume) {
super({ autoDestroy: true, read: resume })
super({ autoDestroy: true })
this[kResume] = resume
}

_read () {
this[kResume]()
}

_destroy (err, callback) {
Expand Down Expand Up @@ -77,6 +82,7 @@ class PipelineHandler extends AsyncResource {

this.opaque = opts.opaque || null
this.handler = handler
this.resume = null

this.req = new PipelineRequest()

Expand All @@ -100,12 +106,17 @@ class PipelineHandler extends AsyncResource {
}
},
destroy: (err, callback) => {
const { body, req, res, ret } = this
const { body, req, res, ret, resume } = this

if (!err && !ret._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (err) {
this.resume = null
resume(err)
}

util.destroy(body, err)
util.destroy(req, err)
util.destroy(res, err)
Expand All @@ -122,14 +133,18 @@ class PipelineHandler extends AsyncResource {
this.res = null
}

onHeaders (statusCode, headers, resume) {
onConnect (resume) {
this.resume = resume
}

onHeaders (statusCode, headers) {
const { opaque, handler } = this

if (statusCode < 200) {
return
}

this.res = new PipelineResponse(resume)
this.res = new PipelineResponse(this.resume)

let body
try {
Expand Down Expand Up @@ -180,29 +195,17 @@ class PipelineHandler extends AsyncResource {

onData (chunk) {
const { res } = this

if (res._readableState.destroyed) {
return
}

return res.push(chunk)
}

onComplete (trailers) {
const { res } = this

if (res._readableState.destroyed) {
return
}

res.push(null)
}

onError (err) {
const { ret } = this

this.handler = null

util.destroy(ret, err)
}
}
Expand Down
33 changes: 20 additions & 13 deletions lib/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ const {
const util = require('./util')
const { AsyncResource } = require('async_hooks')

const kResume = Symbol('resume')

class RequestResponse extends Readable {
constructor (resume) {
super({ autoDestroy: true, read: resume })
super({ autoDestroy: true })
this[kResume] = resume
}

_read () {
this[kResume]()
}

_destroy (err, callback) {
Expand All @@ -20,6 +27,10 @@ class RequestResponse extends Readable {
err = new RequestAbortedError()
}

if (err) {
this[kResume](err)
}

callback(err)
}
}
Expand All @@ -39,15 +50,21 @@ class RequestHandler extends AsyncResource {
this.opaque = opts.opaque || null
this.callback = callback
this.res = null
this.resume = null
}

onConnect (resume) {
this.resume = resume
}

onHeaders (statusCode, headers, resume) {
const { callback, opaque } = this
onHeaders (statusCode, headers) {
const { callback, opaque, resume } = this

if (statusCode < 200) {
return
}

this.resume = null
const body = new RequestResponse(resume)

this.callback = null
Expand All @@ -63,21 +80,11 @@ class RequestHandler extends AsyncResource {

onData (chunk) {
const { res } = this

if (res._readableState.destroyed) {
return
}

return res.push(chunk)
}

onComplete (trailers) {
const { res } = this

if (res._readableState.destroyed) {
return
}

res.push(null)
}

Expand Down
29 changes: 14 additions & 15 deletions lib/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ class StreamHandler extends AsyncResource {
this.factory = factory
this.callback = callback
this.res = null
this.resume = null
this.trailers = null
}

onHeaders (statusCode, headers, resume) {
onConnect (resume) {
this.resume = resume
}

onHeaders (statusCode, headers) {
const { factory, opaque } = this

if (statusCode < 200) {
Expand All @@ -54,16 +59,21 @@ class StreamHandler extends AsyncResource {
throw new InvalidReturnValueError('expected Writable')
}

res.on('drain', resume)
res.on('drain', this.resume)
// TODO: Avoid finished. It registers an unecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers } = this
const { callback, res, opaque, trailers, resume } = this

this.res = null
if (err || !res.readable) {
util.destroy(res, err)
}

if (err) {
this.resume = null
resume(err)
}

this.callback = null
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
})
Expand All @@ -73,23 +83,12 @@ class StreamHandler extends AsyncResource {

onData (chunk) {
const { res } = this

if (util.isDestroyed(res)) {
return
}

return res.write(chunk)
}

onComplete (trailers) {
const { res } = this

if (util.isDestroyed(res)) {
return
}

this.trailers = trailers ? util.parseHeaders(trailers) : {}

res.end()
}

Expand All @@ -101,7 +100,7 @@ class StreamHandler extends AsyncResource {
if (res) {
this.res = null
util.destroy(res, err)
} else {
} else if (callback) {
this.callback = null
this.runInAsyncScope(callback, null, err, { opaque })
}
Expand Down
3 changes: 3 additions & 0 deletions lib/client-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class UpgradeHandler extends AsyncResource {
this.callback = callback
}

onConnect () {
}

onUpgrade (statusCode, headers, socket) {
const { callback, opaque } = this

Expand Down
Loading

0 comments on commit d1dfee2

Please sign in to comment.