Skip to content

Commit

Permalink
refactor: update internal API (#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 3, 2020
1 parent b685747 commit 68eef85
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 32 deletions.
4 changes: 2 additions & 2 deletions benchmarks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class NoopRequest extends Request {

_onHeaders () {}

_onBody () {}
_onData () {}

_onComplete () {
this.deferred.resolve()
Expand All @@ -186,7 +186,7 @@ class SimpleRequest extends Request {
this.resume = resume
}

_onBody (chunk, offset, length) {
_onData (chunk, offset, length) {
return this.dst.write(chunk.slice(offset, offset + length))
}

Expand Down
17 changes: 13 additions & 4 deletions lib/client-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ class PipelineRequest extends Request {
throw new InvalidArgumentError('invalid opts.onTrailers')
}

this.pause = null
this.resume = null
this.callback = callback
this.aborted = false
this.onInfo = opts.onInfo
this.onTrailers = opts.onTrailers
}

_onConnect (pause, resume) {
this.pause = pause
this.resume = resume
}

_onInfo (statusCode, headers) {
if (this.onInfo) {
try {
Expand All @@ -46,22 +53,24 @@ class PipelineRequest extends Request {
}
}

_onHeaders (statusCode, headers, resume) {
_onHeaders (statusCode, headers) {
const { callback } = this

this.callback = null
this.res = callback.call(this, null, {
statusCode,
headers,
opaque: this.opaque,
resume
resume: this.resume
})
}

_onBody (chunk) {
_onData (chunk) {
const { res } = this

return res(null, chunk)
if (!res(null, chunk)) {
this.pause()
}
}

_onComplete (trailers) {
Expand Down
23 changes: 15 additions & 8 deletions lib/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@ const util = require('./util')
const { kEnqueue } = require('./symbols')

const kRequest = Symbol('request')
const kResume = Symbol('resume')

class RequestResponse extends Readable {
constructor (request, resume) {
constructor (request) {
super({ autoDestroy: true })

this[kResume] = resume
this[kRequest] = request
}

_read () {
this[kResume]()
this[kRequest].resume()
}

_destroy (err, callback) {
Expand Down Expand Up @@ -57,12 +55,19 @@ class RequestRequest extends Request {

super(opts, client)

this.pause = null
this.resume = null
this.callback = callback
this.res = null
this.onInfo = opts.onInfo
this.onTrailers = opts.onTrailers
}

_onConnect (pause, resume) {
this.pause = pause
this.resume = resume
}

_onInfo (statusCode, headers) {
if (this.onInfo) {
try {
Expand All @@ -73,18 +78,20 @@ class RequestRequest extends Request {
}
}

_onHeaders (statusCode, headers, resume) {
_onHeaders (statusCode, headers) {
const { callback, opaque } = this
const body = new RequestResponse(this, resume)
const body = new RequestResponse(this)

this.callback = null
this.res = body

callback(null, { statusCode, headers, opaque, body })
}

_onBody (chunk) {
return this.res.push(chunk)
_onData (chunk) {
if (!this.res.push(chunk)) {
this.pause()
}
}

_onComplete (trailers) {
Expand Down
17 changes: 13 additions & 4 deletions lib/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class StreamRequest extends Request {

super(opts, client)

this.pause = null
this.resume = null
this.factory = factory
this.callback = callback
this.res = null
Expand All @@ -41,6 +43,11 @@ class StreamRequest extends Request {
this.onTrailers = opts.onTrailers
}

_onConnect (pause, resume) {
this.pause = pause
this.resume = resume
}

_onInfo (statusCode, headers) {
const { opaque } = this

Expand All @@ -53,7 +60,7 @@ class StreamRequest extends Request {
}
}

_onHeaders (statusCode, headers, resume) {
_onHeaders (statusCode, headers) {
const { factory, opaque } = this

this.factory = null
Expand Down Expand Up @@ -83,7 +90,7 @@ class StreamRequest extends Request {
return
}

res.on('drain', resume)
res.on('drain', this.resume)
// TODO: Avoid finished. It registers an unecessary amount of listeners.
finished(res, { readable: false }, (err) => {
if (err) {
Expand All @@ -109,10 +116,12 @@ class StreamRequest extends Request {
this.res = res
}

_onBody (chunk) {
_onData (chunk) {
const { res } = this

return res.write(chunk)
if (!res.write(chunk)) {
this.pause()
}
}

_onComplete (trailers) {
Expand Down
14 changes: 10 additions & 4 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const {
const {
kUrl,
kReset,
kPause,
kResume,
kClient,
kParser,
kConnect,
Expand Down Expand Up @@ -390,7 +392,6 @@ class Parser extends HTTPParser {

this.client = client
this.socket = socket
this.resumeSocket = () => socket.resume()

this.statusCode = null
this.upgrade = false
Expand Down Expand Up @@ -461,7 +462,7 @@ class Parser extends HTTPParser {

[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
const { client, socket, resumeSocket } = this
const { client, socket } = this
const request = client[kQueue][client[kRunningIdx]]

// TODO: Check for content-length mismatch?
Expand Down Expand Up @@ -509,7 +510,7 @@ class Parser extends HTTPParser {
if (statusCode === 100) {
// TODO: 100 Continue
} else {
request.onHeaders(statusCode, headers, resumeSocket)
request.onHeaders(statusCode, headers)
}

return request.method === 'HEAD' || statusCode < 200 ? 1 : 0
Expand Down Expand Up @@ -660,6 +661,8 @@ function onSocketClose () {
}

function detachSocket (socket) {
socket[kPause] = null
socket[kResume] = null
socket[kClient] = null
socket[kParser] = null
socket[kError] = null
Expand Down Expand Up @@ -704,6 +707,8 @@ function connect (client) {
parser.consume(socket._handle._externalStream)
}

socket[kPause] = socket.pause.bind(socket)
socket[kResume] = socket.resume.bind(socket)
socket[kError] = null
socket[kParser] = parser
socket[kClient] = client
Expand Down Expand Up @@ -889,8 +894,9 @@ function write (client, request) {

const socket = client[kSocket]

socket.setTimeout(client[kSocketTimeout])
request.onConnect(socket[kPause], socket[kResume])

socket.setTimeout(client[kSocketTimeout])
socket.cork()
socket.write(header)

Expand Down
20 changes: 14 additions & 6 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ class Request extends AsyncResource {
: null
}

onConnect (pause, resume) {
this[kResume] = resume

if (this._onConnect) {
this.runInAsyncScope(this._onConnect, this, pause, resume)
}
}

onUpgrade (statusCode, headers, socket) {
assert(this.upgrade || this.method === 'CONNECT')

Expand Down Expand Up @@ -217,7 +225,7 @@ class Request extends AsyncResource {
}
}

onHeaders (statusCode, headers, resume) {
onHeaders (statusCode, headers) {
assert(!this.upgrade && this.method !== 'CONNECT')

if (this.aborted) {
Expand All @@ -233,15 +241,13 @@ class Request extends AsyncResource {
clearTimeout(timeout)
}

this[kResume] = resume

if (statusCode < 200) {
if (this._onInfo) {
this.runInAsyncScope(this._onInfo, this, statusCode, headers)
}
} else {
if (this._onHeaders) {
this.runInAsyncScope(this._onHeaders, this, statusCode, headers, resume)
this.runInAsyncScope(this._onHeaders, this, statusCode, headers)
}
}
}
Expand All @@ -253,11 +259,13 @@ class Request extends AsyncResource {
return null
}

if (!this._onBody) {
if (!this._onData) {
return true
}

return this.runInAsyncScope(this._onBody, this, chunk.slice(offset, offset + length))
this.runInAsyncScope(this._onData, this, chunk.slice(offset, offset + length))

return true
}

onComplete (trailers) {
Expand Down
2 changes: 2 additions & 0 deletions lib/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module.exports = {
kResuming: Symbol('resuming'),
kQueue: Symbol('queue'),
kConnect: Symbol('connect'),
kResume: Symbol('resume'),
kPause: Symbol('pause'),
kSocketTimeout: Symbol('socket timeout'),
kIdleTimeout: Symbol('idle timeout'),
kRequestTimeout: Symbol('request timeout'),
Expand Down
12 changes: 8 additions & 4 deletions test/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,15 @@ test('stream response resume back pressure and non standard error', (t) => {
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http:https://localhost:${server.address().port}`)
const client = new Client(`http:https://localhost:${server.address().port}`, {
maxAbortedPayload: 1e5
})
t.tearDown(client.close.bind(client))

const pt = new PassThrough()
client.stream({
path: '/',
method: 'GET',
maxAbortedPayload: 1e5
method: 'GET'
}, () => {
pt.on('data', () => {
pt.emit('error', new Error('kaboom'))
Expand All @@ -177,9 +178,12 @@ test('stream response resume back pressure and non standard error', (t) => {
t.strictEqual(pt.destroyed, true)
})

client.on('disconnect', (err) => {
client.once('disconnect', (err) => {
t.ok(err)
t.pass()
client.on('disconnect', () => {
t.fail()
})
})

client.stream({
Expand Down

0 comments on commit 68eef85

Please sign in to comment.