Skip to content

Commit

Permalink
fix: Client.stream writableNeedDrain (#442)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 13, 2020
1 parent b28f49e commit 85f05ed
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
6 changes: 6 additions & 0 deletions lib/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ class StreamHandler extends AsyncResource {
})

this.res = res

const needDrain = res.writableNeedDrain !== undefined
? res.writableNeedDrain
: res._writableState && res._writableState.needDrain

return needDrain !== true
}

onData (chunk) {
Expand Down
61 changes: 58 additions & 3 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -374,19 +374,53 @@ class Parser extends HTTPParser {
this.headers = null
this.shouldKeepAlive = false
this.request = null
this.paused = false

// Parser can't be paused from within a callback.
// Use a buffer in JS land in order to stop further progress while paused.
this.resuming = false
this.queue = []

this._resume = () => {
// TODO: Resume parser.
if (!this.paused || this.resuming) {
return
}

this.paused = false

this.resuming = true
while (this.queue.length) {
const [fn, ...args] = this.queue.shift()

Reflect.apply(fn, this, args)

if (this.paused) {
this.resuming = false
return
}
}
this.resuming = false

socketResume(socket)
}

this._pause = () => {
// TODO: Pause parser.
if (this.paused) {
return
}

this.paused = true

socketPause(socket)
}
}

[HTTPParser.kOnHeaders] (rawHeaders) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnHeaders], rawHeaders])
return
}

if (this.headers) {
Array.prototype.push.apply(this.headers, rawHeaders)
} else {
Expand All @@ -395,6 +429,11 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnExecute] (ret) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnExecute], ret])
return
}

const { upgrade, socket } = this

if (!Number.isFinite(ret)) {
Expand Down Expand Up @@ -466,6 +505,12 @@ class Parser extends HTTPParser {

[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnHeadersComplete], versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive])
return
}

const { client, socket } = this

const request = client[kQueue][client[kRunningIdx]]
Expand Down Expand Up @@ -561,6 +606,11 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnBody] (chunk, offset, length) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnBody], chunk, offset, length])
return
}

const { socket, statusCode, request } = this

if (socket.destroyed) {
Expand All @@ -579,13 +629,19 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnMessageComplete] () {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnMessageComplete]])
return
}

const { client, socket, statusCode, headers, upgrade, request, trailers } = this

if (socket.destroyed) {
return
}

assert(statusCode >= 100)
assert(this.resuming || (socket._handle && socket._handle.reading))

if (upgrade) {
// TODO: When, how and why does this happen?
Expand Down Expand Up @@ -643,7 +699,6 @@ class Parser extends HTTPParser {
// have been queued since then.
util.destroy(socket, new InformationalError('reset'))
} else {
socketResume(socket)
resume(client)
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class Request {

onBody (chunk, offset, length) {
assert(!this.aborted)
assert(!this[kPaused])

if (this[kTimeout] && this[kTimeout].refresh) {
this[kTimeout].refresh()
Expand Down
43 changes: 43 additions & 0 deletions test/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,46 @@ test('stream body destroyed on invalid callback', (t) => {
}
})
})

test('stream needDrain', (t) => {
t.plan(1)

const server = createServer((req, res) => {
res.end(Buffer.alloc(4096))
})
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`https://localhost:${server.address().port}`)
t.tearDown(() => {
client.destroy()
})

const dst = new PassThrough()
dst.pause()

while (dst.write(Buffer.alloc(4096))) {

}

const orgWrite = dst.write
dst.write = () => t.fail()
const p = client.stream({
path: '/',
method: 'GET'
}, () => {
return dst
})

setImmediate(() => {
dst.write = (...args) => {
orgWrite.call(dst, ...args)
}
dst.resume()
})

p.then(() => {
t.pass()
})
})
})

0 comments on commit 85f05ed

Please sign in to comment.