Skip to content

Commit

Permalink
feat: dispatch (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 8, 2020
1 parent a327844 commit 1c961cf
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 142 deletions.
42 changes: 38 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ Machine: 2.8GHz AMD EPYC 7402P<br/>
Configuration: Node v14.4, HTTP/1.1 without TLS, 100 connections, Linux 5.4.12-1-lts

```
http - keepalive x 5,521 ops/sec ±3.37% (73 runs sampled)
undici - pipeline x 9,292 ops/sec ±4.28% (79 runs sampled)
undici - request x 11,949 ops/sec ±0.99% (85 runs sampled)
undici - stream x 12,223 ops/sec ±0.76% (85 runs sampled)
http - keepalive x 5,826 ops/sec ±1.45% (275 runs sampled)
undici - pipeline x 7,281 ops/sec ±1.68% (273 runs sampled)
undici - request x 11,700 ops/sec ±0.56% (278 runs sampled)
undici - stream x 12,684 ops/sec ±0.72% (280 runs sampled)
undici - dispatch x 13,446 ops/sec ±0.37% (276 runs sampled)
```

The benchmark is a simple `hello world` [example](benchmarks/index.js).
Expand Down Expand Up @@ -451,6 +452,34 @@ there might still be some progress on dispatched requests.
Returns a promise if no callback is provided.
<a name='dispatch'></a>
#### `client.dispatch(opts, handler): Promise|Void`
This is the low level API which all the preceeding API's are implemented on top of.
Options:
* ... same as [`client.request(opts[, callback])`][request].
The `handler` parameter is defined as follow:
* `onUpgrade(statusCode, headers, socket): Void`, invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method.
* `statusCode: Number`
* `headers: Object`
* `socket: Duplex`
* `onHeaders(statusCode, headers, resume): Void`, invoked when statusCode and headers have been received.
May be invoked multiple times due to 1xx informational headers.
* `statusCode: Number`
* `headers: Object`
* `resume(): Void`, resume `onData` after returning `false`.
* `onData(chunk): Null|Boolean`, invoked when response payload data is received.
* `chunk: Buffer`
* `onComplete(trailers): Void`, invoked when response payload and trailers have been received and the request has completed.
* `trailers: Object`
* `onError(err): Void`, invoked when an error has occured.
* `err: Error`
#### `client.pipelining: Number`
Property to get and set the pipelining factor.
Expand Down Expand Up @@ -528,6 +557,10 @@ Calls [`client.upgrade(opts, callback)`][upgrade] on one of the clients.
Calls [`client.connect(opts, callback)`][connect] on one of the clients.
#### `pool.dispatch(opts, handler): Void`
Calls [`client.dispatch(opts, handler)`][dispatch] on one of the clients.
#### `pool.close([callback]): Promise|Void`
Calls [`client.close(callback)`](#close) on all the clients.
Expand Down Expand Up @@ -604,3 +637,4 @@ MIT
[pipeline]: #pipeline
[upgrade]: #upgrade
[connect]: #connect
[dispatch]: #dispatch
38 changes: 18 additions & 20 deletions benchmarks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ const { Writable } = require('stream')
const http = require('http')
const Benchmark = require('benchmark')
const undici = require('..')
const { kEnqueue, kGetNext } = require('../lib/symbols')
const Request = require('../lib/request')
const { kGetNext } = require('../lib/symbols')

// # Start the h2o server (in h2o repository)
// # Then change the port below to 8080
Expand Down Expand Up @@ -121,7 +120,7 @@ suite
})
}
})
.add('undici - simple', {
.add('undici - dispatch', {
defer: true,
fn: deferred => {
const stream = new Writable({
Expand All @@ -133,51 +132,50 @@ suite
deferred.resolve()
})
const client = pool[kGetNext]()
client[kEnqueue](new SimpleRequest(client, undiciOptions, stream))
client.dispatch(undiciOptions, new SimpleRequest(stream))
}
})
.add('undici - noop', {
defer: true,
fn: deferred => {
const client = pool[kGetNext]()
client[kEnqueue](new NoopRequest(client, undiciOptions, deferred))
client.dispatch(undiciOptions, new NoopRequest(deferred))
}
})
.on('cycle', event => {
console.log(String(event.target))
})
.run()

class NoopRequest extends Request {
constructor (client, opts, deferred) {
super(opts, client)
class NoopRequest {
constructor (deferred) {
this.deferred = deferred
}

_onHeaders () {}
_onHeaders () {

_onData () {}
}

_onComplete () {
_onData (chunk) {
return true
}

_onComplete (trailers) {
this.deferred.resolve()
}
}

class SimpleRequest extends Request {
constructor (client, opts, dst) {
super(opts, client)
class SimpleRequest {
constructor (dst) {
this.dst = dst
this.dst.on('drain', () => {
this.resume()
})
}

_onHeaders (statusCode, headers, resume) {
this.resume = resume
this.dst.on('drain', resume)
}

_onData (chunk, offset, length) {
return this.dst.write(chunk.slice(offset, offset + length))
_onData (chunk) {
return this.dst.write(chunk)
}

_onComplete () {
Expand Down
40 changes: 17 additions & 23 deletions lib/client-connect.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,16 @@
'use strict'

const Request = require('./request')
const {
InvalidArgumentError
} = require('./errors')
const { kEnqueue } = require('./symbols')

class ConnectRequest extends Request {
class ConnectRequest {
constructor (client, opts, callback) {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('invalid opts')
}

const {
path,
headers,
servername,
signal,
requestTimeout,
opaque = null
} = opts

super({
path,
method: 'CONNECT',
headers,
servername,
signal,
requestTimeout
}, client)

this.opaque = opaque
this.opaque = opts.opaque || null
this.callback = callback
}

Expand Down Expand Up @@ -70,7 +50,21 @@ module.exports = function connect (client, opts, callback) {
}

try {
client[kEnqueue](new ConnectRequest(client, opts, callback))
const {
path,
headers,
servername,
signal,
requestTimeout
} = opts
client.dispatch({
path,
method: 'CONNECT',
headers,
servername,
signal,
requestTimeout
}, new ConnectRequest(client, opts, callback))
} catch (err) {
process.nextTick(callback, err, null)
}
Expand Down
32 changes: 11 additions & 21 deletions lib/client-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ const {
InvalidReturnValueError,
RequestAbortedError
} = require('./errors')
const Request = require('./request')
const util = require('./util')
const { kEnqueue } = require('./symbols')
const { AsyncResource } = require('async_hooks')

// TODO: Refactor

const kResume = Symbol('resume')

class PipelineRequest extends Request {
class PipelineRequest extends AsyncResource {
constructor (client, opts, callback) {
super(opts, client)
super('UNDICI_PIPELINE')

if (opts.onInfo && typeof opts.onInfo !== 'function') {
throw new InvalidArgumentError('invalid opts.onInfo')
Expand All @@ -42,13 +41,13 @@ class PipelineRequest extends Request {

if (statusCode < 200) {
if (this.onInfo) {
this.onInfo({ statusCode, headers, opaque })
this.runInAsyncScope(this.onInfo, this, { statusCode, headers, opaque })
}
return
}

this.callback = null
this.res = callback.call(this, null, {
this.res = this.runInAsyncScope(callback, this, null, {
statusCode,
headers,
opaque,
Expand All @@ -59,18 +58,14 @@ class PipelineRequest extends Request {
_onData (chunk) {
const { res } = this

return res ? res(null, chunk) : null
return this.runInAsyncScope(res, null, null, chunk)
}

_onComplete (trailers) {
const { res } = this

if (!res) {
return
}

if (trailers && this.onTrailers) {
this.onTrailers({ trailers, opaque: this.opaque })
this.runInAsyncScope(this.onTrailers, null, { trailers, opaque: this.opaque })
}

res(null, null)
Expand All @@ -81,12 +76,12 @@ class PipelineRequest extends Request {

if (res) {
this.res = null
res(err, null)
this.runInAsyncScope(res, null, err, null)
}

if (callback) {
this.callback = null
callback.call(this, err, null)
this.runInAsyncScope(callback, null, err, null)
}
}
}
Expand Down Expand Up @@ -155,15 +150,10 @@ module.exports = function (client, opts, handler) {
util.destroy(req, err)
util.destroy(res, err)

if (err) {
request.onError(err)
}

request.runInAsyncScope(
callback,
null,
err,
null
err
)
}
}).on('prefinish', () => {
Expand Down Expand Up @@ -270,7 +260,7 @@ module.exports = function (client, opts, handler) {
}
})

client[kEnqueue](request)
client.dispatch(opts, request)

return ret
} catch (err) {
Expand Down
Loading

0 comments on commit 1c961cf

Please sign in to comment.