Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: try to re-use timer when possible #729

Merged
merged 18 commits into from
Apr 13, 2021
11 changes: 7 additions & 4 deletions benchmarks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ const path = require('path')
const connections = parseInt(process.env.CONNECTIONS, 10) || 50
const parallelRequests = parseInt(process.env.PARALLEL, 10) || 10
const pipelining = parseInt(process.env.PIPELINING, 10) || 10
const headersTimeout = parseInt(process.env.HEADERS_TIMEOUT, 10) || 0
const bodyTimeout = parseInt(process.env.BODY_TIMEOUT, 10) || 0

Benchmark.options.minSamples = parseInt(process.env.SAMPLES, 10) || 100

const dest = {}

if (process.env.PORT) {
Expand Down Expand Up @@ -52,8 +57,8 @@ const httpOptionsMultiSocket = {
const undiciOptions = {
path: '/',
method: 'GET',
headersTimeout: 0,
bodyTimeout: 0
headersTimeout,
bodyTimeout
}

const client = new Client(httpOptions.url, {
Expand All @@ -69,8 +74,6 @@ const pool = new Pool(httpOptions.url, {

const suite = new Benchmark.Suite()

// Benchmark.options.minSamples = 200

suite
.add('http - no agent ', {
defer: true,
Expand Down
176 changes: 84 additions & 92 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ const {
kTLSServerName,
kTLSSession,
kSetTLSSession,
kConnectTimeout,
kConnectTimeoutValue,
kKeepAliveDefaultTimeout,
kHostHeader,
Expand All @@ -68,8 +67,6 @@ const {
kMaxHeadersSize,
kKeepAliveMaxTimeout,
kKeepAliveTimeoutThreshold,
kIdleTimeout,
kIdleTimeoutValue,
kHeadersTimeout,
kBodyTimeout,
kStrictContentLength
Expand Down Expand Up @@ -163,7 +160,6 @@ class Client extends Dispatcher {
this[kUrl] = util.parseOrigin(url)
this[kSocketPath] = socketPath
this[kConnectTimeoutValue] = connectTimeout == null ? 10e3 : connectTimeout
this[kConnectTimeout] = null
this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold
Expand Down Expand Up @@ -231,7 +227,7 @@ class Client extends Dispatcher {
get [kConnected] () {
return (
this[kSocket] &&
this[kSocket].connecting !== true &&
!this[kSocket].pending &&
// Older versions of Node don't set secureConnecting to false.
(this[kSocket].authorized !== false ||
this[kSocket].authorizationError
Expand Down Expand Up @@ -447,6 +443,11 @@ const llhttp = new WebAssembly.Instance(mod, {
}
})

const TIMEOUT_HEADERS = 1
const TIMEOUT_BODY = 2
const TIMEOUT_IDLE = 3
const TIMEOUT_CONNECT = 4

class Parser {
constructor (client, socket) {
assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
Expand All @@ -458,6 +459,8 @@ class Parser {
this.client = client
this.socket = socket
this.timeout = null
this.timeoutValue = null
this.timeoutType = null
this.statusCode = null
this.upgrade = false
this.headers = []
Expand All @@ -472,6 +475,20 @@ class Parser {
socket.on('data', this.execute)
}

setTimeout (value, type) {
this.timeoutType = type
if (value !== this.timeoutValue) {
clearTimeout(this.timeout)
this.timeout = value ? setTimeout(onParserTimeout, value, this) : null
this.timeoutValue = value
} else if (this.timeout) {
// istanbul ignore else: only for jest
if (this.timeout.refresh) {
this.timeout.refresh()
}
}
}

resume () {
if (this.socket.destroyed || !this.paused) {
return
Expand All @@ -482,6 +499,7 @@ class Parser {

llhttp.exports.llhttp_resume(this.ptr)

assert(this.timeoutType === TIMEOUT_BODY)
if (this.timeout) {
// istanbul ignore else: only for jest
if (this.timeout.refresh) {
Expand Down Expand Up @@ -613,7 +631,7 @@ class Parser {
}

onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
const { client, socket, timeout, headers: rawHeaders } = this
const { client, socket, headers: rawHeaders } = this

/* istanbul ignore next: difficult to make a test case for */
if (socket.destroyed) {
Expand Down Expand Up @@ -641,36 +659,23 @@ class Parser {
return -1
}

const headersTimeout = request.headersTimeout !== undefined
? request.headersTimeout
: client[kHeadersTimeout]
assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS)

assert(
timeout || // have timeout
!headersTimeout || // no timeout
socket[kWriting], // still writing
'invalid headers timeout state'
)
this.statusCode = statusCode
this.shouldKeepAlive = shouldKeepAlive

if (statusCode >= 200) {
const bodyTimeout = request.bodyTimeout !== undefined
if (this.statusCode >= 200) {
const bodyTimeout = request.bodyTimeout != null
? request.bodyTimeout
: client[kBodyTimeout]

clearTimeout(timeout)
this.timeout = bodyTimeout
? setTimeout(onBodyTimeout, bodyTimeout, this.socket)
: null
} else if (timeout) {
this.setTimeout(bodyTimeout, TIMEOUT_BODY)
} else if (this.timeout) {
// istanbul ignore else: only for jest
if (this.timeout.refresh) {
this.timeout.refresh()
}
}

this.statusCode = statusCode
this.shouldKeepAlive = shouldKeepAlive

if (request.method === 'CONNECT' && statusCode >= 200 && statusCode < 300) {
assert(client[kRunning] === 1)
this.upgrade = true
Expand Down Expand Up @@ -743,7 +748,7 @@ class Parser {
}

onBody (buf) {
const { client, socket, statusCode, timeout } = this
const { client, socket, statusCode } = this

if (socket.destroyed) {
return -1
Expand All @@ -752,7 +757,8 @@ class Parser {
const request = client[kQueue][client[kRunningIdx]]
assert(request)

if (timeout) {
assert.strictEqual(this.timeoutType, TIMEOUT_BODY)
if (this.timeout) {
// istanbul ignore else: only for jest
if (this.timeout.refresh) {
this.timeout.refresh()
Expand All @@ -771,7 +777,7 @@ class Parser {
}

onMessageComplete () {
const { client, socket, statusCode, upgrade, trailers, timeout, headers: rawTrailers } = this
const { client, socket, statusCode, upgrade, trailers, headers: rawTrailers } = this

if (socket.destroyed) {
return -1
Expand All @@ -797,11 +803,6 @@ class Parser {
return
}

if (timeout) {
clearTimeout(timeout)
this.timeout = null
}

for (const trailer of trailers) {
let found = false
for (let n = 0; n < rawTrailers.length; n += 2) {
Expand Down Expand Up @@ -847,6 +848,7 @@ class Parser {

destroy () {
assert(this.ptr != null)
assert(currentParser == null)

llhttp.exports.llhttp_free(this.ptr)
this.ptr = null
Expand All @@ -858,30 +860,41 @@ class Parser {

clearTimeout(this.timeout)
this.timeout = null
this.timeoutValue = null
this.timeoutType = null

this.paused = false

this.socket.removeListener('data', this.execute)
}
}

function onBodyTimeout (socket) {
if (!socket.isPaused()) {
util.destroy(socket, new BodyTimeoutError())
}
}
function onParserTimeout (parser) {
const { socket, timeoutType, client } = parser

function onHeadersTimeout (socket) {
assert(!socket.isPaused(), 'socket cannot be paused while waiting for headers')
util.destroy(socket, new HeadersTimeoutError())
if (timeoutType === TIMEOUT_HEADERS) {
if (!socket[kWriting]) {
assert(!socket.isPaused(), 'socket cannot be paused while waiting for headers')
util.destroy(socket, new HeadersTimeoutError())
}
} else if (timeoutType === TIMEOUT_BODY) {
if (!socket.isPaused()) {
util.destroy(socket, new BodyTimeoutError())
}
} else if (timeoutType === TIMEOUT_IDLE) {
if (client[kRunning] === 0 && client[kKeepAliveTimeoutValue]) {
util.destroy(socket, new InformationalError('socket idle timeout'))
}
} else if (timeoutType === TIMEOUT_CONNECT) {
if (!client[kConnected]) {
util.destroy(socket, new ConnectTimeoutError())
}
}
}

function onSocketConnect () {
const { [kClient]: client } = this

clearTimeout(this[kConnectTimeout])
this[kConnectTimeout] = null

client.emit('connect', client[kUrl], [client])
resume(client)
}
Expand Down Expand Up @@ -919,14 +932,6 @@ function onSocketEnd () {
}

function detachSocket (socket) {
clearTimeout(socket[kConnectTimeout])
socket[kConnectTimeout] = null
socket[kConnectTimeoutValue] = null

clearTimeout(socket[kIdleTimeout])
socket[kIdleTimeout] = null
socket[kIdleTimeoutValue] = null

socket[kParser].destroy()
socket[kParser] = null

Expand Down Expand Up @@ -1022,20 +1027,10 @@ function connect (client) {

client[kSocket] = socket

const parser = new Parser(client, socket)

if (client[kConnectTimeoutValue]) {
socket[kConnectTimeout] = setTimeout((socket) => {
socket.destroy(new ConnectTimeoutError())
}, client[kConnectTimeoutValue], socket)
}

socket[kIdleTimeout] = null
socket[kIdleTimeoutValue] = null
socket[kWriting] = false
socket[kReset] = false
socket[kError] = null
socket[kParser] = parser
socket[kParser] = new Parser(client, socket)
socket[kClient] = client
socket
.setNoDelay(true)
Expand Down Expand Up @@ -1081,34 +1076,23 @@ function _resume (client, sync) {
const socket = client[kSocket]
const connected = client[kConnected]

if (socket && connected) {
const timeout = client[kRunning] > 0 ? 0 : client[kKeepAliveTimeoutValue]

if (socket[kIdleTimeoutValue] !== timeout) {
clearTimeout(socket[kIdleTimeout])
if (timeout) {
socket[kIdleTimeout] = setTimeout((socket) => {
util.destroy(socket, new InformationalError('socket idle timeout'))
}, timeout, socket)
if (socket) {
if (!connected) {
if (socket[kParser].timeoutType !== TIMEOUT_CONNECT) {
socket[kParser].setTimeout(client[kConnectTimeoutValue], TIMEOUT_CONNECT)
}
} else if (client[kSize] === 0) {
if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
}
} else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
const request = client[kQueue][client[kRunningIdx]]
const headersTimeout = request.headersTimeout != null
? request.headersTimeout
: client[kHeadersTimeout]
socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
}
socket[kIdleTimeoutValue] = timeout
}
}

if (client[kRunning] > 0) {
const request = client[kQueue][client[kRunningIdx]]

const headersTimeout = request.headersTimeout !== undefined
? request.headersTimeout
: client[kHeadersTimeout]

if (
headersTimeout &&
socket &&
!socket[kWriting] &&
!socket[kParser].timeout
) {
socket[kParser].timeout = setTimeout(onHeadersTimeout, headersTimeout, socket)
}
}

Expand Down Expand Up @@ -1156,7 +1140,7 @@ function _resume (client, sync) {

if (!socket) {
connect(client)
return
continue
}

if (!connected) {
Expand Down Expand Up @@ -1469,6 +1453,14 @@ function write (client, request) {
socket.write('\r\n0\r\n\r\n', 'ascii')
}

assert.strictEqual(socket[kParser].timeoutType, TIMEOUT_HEADERS)
if (socket[kParser].timeout) {
// istanbul ignore else: only for jest
if (socket[kParser].timeout.refresh) {
socket[kParser].timeout.refresh()
}
}

resume(client)
}

Expand Down
5 changes: 1 addition & 4 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ module.exports = {
kResuming: Symbol('resuming'),
kQueue: Symbol('queue'),
kConnect: Symbol('connect'),
kConnectTimeout: Symbol('connect timeout'),
kConnectTimeoutValue: Symbol('connect timeou valuet'),
kIdleTimeout: Symbol('idle timeout'),
kIdleTimeoutValue: Symbol('idle timeout value'),
kConnectTimeoutValue: Symbol('connect timeout value'),
kKeepAliveDefaultTimeout: Symbol('default keep alive timeout'),
kKeepAliveMaxTimeout: Symbol('max keep alive timeout'),
kKeepAliveTimeoutThreshold: Symbol('keep alive timeout threshold'),
Expand Down