diff --git a/.eslintrc.yaml b/.eslintrc.yaml deleted file mode 100644 index f3d983b9c..000000000 --- a/.eslintrc.yaml +++ /dev/null @@ -1,19 +0,0 @@ -env: - browser: true - es6: true - mocha: true - node: true -extends: - - eslint:recommended - - plugin:prettier/recommended -parserOptions: - ecmaVersion: latest - sourceType: module -rules: - no-console: off - no-var: error - prefer-const: error - quotes: - - error - - single - - avoidEscape: true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aee196f92..04693fc7e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,8 @@ jobs: - 14 - 16 - 18 - - 19 + - 20 + - 22 os: - macOS-latest - ubuntu-latest @@ -34,15 +35,17 @@ jobs: node: 18 os: windows-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-node@v3 + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 with: node-version: ${{ matrix.node }} architecture: ${{ matrix.arch }} + cache: 'npm' + cache-dependency-path: ./package.json - run: npm install - run: npm run lint if: - matrix.os == 'ubuntu-latest' && matrix.node == 16 && matrix.arch == + matrix.os == 'ubuntu-latest' && matrix.node == 20 && matrix.arch == 'x64' - run: npm test - run: | @@ -51,7 +54,7 @@ jobs: echo "job_id=$id" >> $GITHUB_OUTPUT id: get_job_id shell: bash - - uses: coverallsapp/github-action@1.1.3 + - uses: coverallsapp/github-action@v2 with: flag-name: ${{ steps.get_job_id.outputs.job_id }} (Node.js ${{ matrix.node }} @@ -62,7 +65,7 @@ jobs: needs: test runs-on: ubuntu-latest steps: - - uses: coverallsapp/github-action@1.1.3 + - uses: coverallsapp/github-action@v2 with: github-token: ${{ secrets.GITHUB_TOKEN }} parallel-finished: true diff --git a/FUNDING.json b/FUNDING.json new file mode 100644 index 000000000..043b42fec --- /dev/null +++ b/FUNDING.json @@ -0,0 +1,7 @@ +{ + "drips": { + "ethereum": { + "ownedBy": "0x3D4f997A071d2BA735AC767E68052679423c3dBe" + } + } +} diff --git a/README.md b/README.md index 4539df294..21f10df10 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,8 @@ Passes the quite extensive Autobahn test suite: [server][server-report], [client][client-report]. **Note**: This module does not work in the browser. The client in the docs is a -reference to a back end with the role of a client in the WebSocket -communication. Browser clients must use the native +reference to a backend with the role of a client in the WebSocket communication. +Browser clients must use the native [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) object. To make the same code work seamlessly on Node.js and the browser, you can use one of the many wrappers available on npm, like @@ -23,6 +23,7 @@ can use one of the many wrappers available on npm, like - [Protocol support](#protocol-support) - [Installing](#installing) - [Opt-in for performance](#opt-in-for-performance) + - [Legacy opt-in for performance](#legacy-opt-in-for-performance) - [API docs](#api-docs) - [WebSocket compression](#websocket-compression) - [Usage examples](#usage-examples) @@ -57,27 +58,37 @@ npm install ws ### Opt-in for performance -There are 2 optional modules that can be installed along side with the ws -module. These modules are binary addons that improve the performance of certain -operations. Prebuilt binaries are available for the most popular platforms so -you don't necessarily need to have a C++ compiler installed on your machine. +[bufferutil][] is an optional module that can be installed alongside the ws +module: -- `npm install --save-optional bufferutil`: Allows to efficiently perform - operations such as masking and unmasking the data payload of the WebSocket - frames. -- `npm install --save-optional utf-8-validate`: Allows to efficiently check if a - message contains valid UTF-8. +``` +npm install --save-optional bufferutil +``` + +This is a binary addon that improves the performance of certain operations such +as masking and unmasking the data payload of the WebSocket frames. Prebuilt +binaries are available for the most popular platforms, so you don't necessarily +need to have a C++ compiler installed on your machine. + +To force ws to not use bufferutil, use the +[`WS_NO_BUFFER_UTIL`](./doc/ws.md#ws_no_buffer_util) environment variable. This +can be useful to enhance security in systems where a user can put a package in +the package search path of an application of another user, due to how the +Node.js resolver algorithm works. -To not even try to require and use these modules, use the -[`WS_NO_BUFFER_UTIL`](./doc/ws.md#ws_no_buffer_util) and -[`WS_NO_UTF_8_VALIDATE`](./doc/ws.md#ws_no_utf_8_validate) environment -variables. These might be useful to enhance security in systems where a user can -put a package in the package search path of an application of another user, due -to how the Node.js resolver algorithm works. +#### Legacy opt-in for performance -The `utf-8-validate` module is not needed and is not required, even if it is -already installed, regardless of the value of the `WS_NO_UTF_8_VALIDATE` -environment variable, if [`buffer.isUtf8()`][] is available. +If you are running on an old version of Node.js (prior to v18.14.0), ws also +supports the [utf-8-validate][] module: + +``` +npm install --save-optional utf-8-validate +``` + +This contains a binary polyfill for [`buffer.isUtf8()`][]. + +To force ws not to use utf-8-validate, use the +[`WS_NO_UTF_8_VALIDATE`](./doc/ws.md#ws_no_utf_8_validate) environment variable. ## API docs @@ -135,7 +146,7 @@ const wss = new WebSocketServer({ ``` The client will only use the extension if it is supported and enabled on the -server. To always disable the extension on the client set the +server. To always disable the extension on the client, set the `perMessageDeflate` option to `false`. ```js @@ -155,6 +166,8 @@ import WebSocket from 'ws'; const ws = new WebSocket('ws://www.host.com/path'); +ws.on('error', console.error); + ws.on('open', function open() { ws.send('something'); }); @@ -171,6 +184,8 @@ import WebSocket from 'ws'; const ws = new WebSocket('ws://www.host.com/path'); +ws.on('error', console.error); + ws.on('open', function open() { const array = new Float32Array(5); @@ -190,6 +205,8 @@ import { WebSocketServer } from 'ws'; const wss = new WebSocketServer({ port: 8080 }); wss.on('connection', function connection(ws) { + ws.on('error', console.error); + ws.on('message', function message(data) { console.log('received: %s', data); }); @@ -212,6 +229,8 @@ const server = createServer({ const wss = new WebSocketServer({ server }); wss.on('connection', function connection(ws) { + ws.on('error', console.error); + ws.on('message', function message(data) { console.log('received: %s', data); }); @@ -226,7 +245,6 @@ server.listen(8080); ```js import { createServer } from 'http'; -import { parse } from 'url'; import { WebSocketServer } from 'ws'; const server = createServer(); @@ -234,15 +252,19 @@ const wss1 = new WebSocketServer({ noServer: true }); const wss2 = new WebSocketServer({ noServer: true }); wss1.on('connection', function connection(ws) { + ws.on('error', console.error); + // ... }); wss2.on('connection', function connection(ws) { + ws.on('error', console.error); + // ... }); server.on('upgrade', function upgrade(request, socket, head) { - const { pathname } = parse(request.url); + const { pathname } = new URL(request.url, 'wss://base.url'); if (pathname === '/foo') { wss1.handleUpgrade(request, socket, head, function done(ws) { @@ -266,16 +288,24 @@ server.listen(8080); import { createServer } from 'http'; import { WebSocketServer } from 'ws'; +function onSocketError(err) { + console.error(err); +} + const server = createServer(); const wss = new WebSocketServer({ noServer: true }); wss.on('connection', function connection(ws, request, client) { + ws.on('error', console.error); + ws.on('message', function message(data) { console.log(`Received message ${data} from user ${client}`); }); }); server.on('upgrade', function upgrade(request, socket, head) { + socket.on('error', onSocketError); + // This function is not defined on purpose. Implement it with your own logic. authenticate(request, function next(err, client) { if (err || !client) { @@ -284,6 +314,8 @@ server.on('upgrade', function upgrade(request, socket, head) { return; } + socket.removeListener('error', onSocketError); + wss.handleUpgrade(request, socket, head, function done(ws) { wss.emit('connection', ws, request, client); }); @@ -306,6 +338,8 @@ import WebSocket, { WebSocketServer } from 'ws'; const wss = new WebSocketServer({ port: 8080 }); wss.on('connection', function connection(ws) { + ws.on('error', console.error); + ws.on('message', function message(data, isBinary) { wss.clients.forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { @@ -325,6 +359,8 @@ import WebSocket, { WebSocketServer } from 'ws'; const wss = new WebSocketServer({ port: 8080 }); wss.on('connection', function connection(ws) { + ws.on('error', console.error); + ws.on('message', function message(data, isBinary) { wss.clients.forEach(function each(client) { if (client !== ws && client.readyState === WebSocket.OPEN) { @@ -342,6 +378,8 @@ import WebSocket from 'ws'; const ws = new WebSocket('wss://websocket-echo.com/'); +ws.on('error', console.error); + ws.on('open', function open() { console.log('connected'); ws.send(Date.now()); @@ -369,6 +407,8 @@ const ws = new WebSocket('wss://websocket-echo.com/'); const duplex = createWebSocketStream(ws, { encoding: 'utf8' }); +duplex.on('error', console.error); + duplex.pipe(process.stdout); process.stdin.pipe(duplex); ``` @@ -393,6 +433,8 @@ const wss = new WebSocketServer({ port: 8080 }); wss.on('connection', function connection(ws, req) { const ip = req.socket.remoteAddress; + + ws.on('error', console.error); }); ``` @@ -402,16 +444,18 @@ the `X-Forwarded-For` header. ```js wss.on('connection', function connection(ws, req) { const ip = req.headers['x-forwarded-for'].split(',')[0].trim(); + + ws.on('error', console.error); }); ``` ### How to detect and close broken connections? -Sometimes the link between the server and the client can be interrupted in a way -that keeps both the server and the client unaware of the broken state of the +Sometimes, the link between the server and the client can be interrupted in a +way that keeps both the server and the client unaware of the broken state of the connection (e.g. when pulling the cord). -In these cases ping messages can be used as a means to verify that the remote +In these cases, ping messages can be used as a means to verify that the remote endpoint is still responsive. ```js @@ -425,6 +469,7 @@ const wss = new WebSocketServer({ port: 8080 }); wss.on('connection', function connection(ws) { ws.isAlive = true; + ws.on('error', console.error); ws.on('pong', heartbeat); }); @@ -445,7 +490,7 @@ wss.on('close', function close() { Pong messages are automatically sent in response to ping messages as required by the spec. -Just like the server example above your clients might as well lose connection +Just like the server example above, your clients might as well lose connection without knowing it. You might want to add a ping listener on your clients to prevent that. A simple implementation would be: @@ -466,6 +511,7 @@ function heartbeat() { const client = new WebSocket('wss://websocket-echo.com/'); +client.on('error', console.error); client.on('open', heartbeat); client.on('ping', heartbeat); client.on('close', function clear() { @@ -487,6 +533,7 @@ We're using the GitHub [releases][changelog] for changelog entries. [MIT](LICENSE) [`buffer.isutf8()`]: https://nodejs.org/api/buffer.html#bufferisutf8input +[bufferutil]: https://github.com/websockets/bufferutil [changelog]: https://github.com/websockets/ws/releases [client-report]: http://websockets.github.io/ws/autobahn/clients/ [https-proxy-agent]: https://github.com/TooTallNate/node-https-proxy-agent @@ -497,4 +544,5 @@ We're using the GitHub [releases][changelog] for changelog entries. [server-report]: http://websockets.github.io/ws/autobahn/servers/ [session-parse-example]: ./examples/express-session-parse [socks-proxy-agent]: https://github.com/TooTallNate/node-socks-proxy-agent +[utf-8-validate]: https://github.com/websockets/utf-8-validate [ws-server-options]: ./doc/ws.md#new-websocketserveroptions-callback diff --git a/SECURITY.md b/SECURITY.md index 0baf19a63..cbaf84de2 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -12,21 +12,21 @@ blocked instantly. ## Exceptions -If you do not receive an acknowledgement within the said time frame please give +If you do not receive an acknowledgement within the said time frame, please give us the benefit of the doubt as it's possible that we haven't seen it yet. In -this case please send us a message **without details** using one of the +this case, please send us a message **without details** using one of the following methods: - Contact the lead developers of this project on their personal e-mails. You can - find the e-mails in the git logs, for example using the following command: + find the e-mails in the git logs, for example, using the following command: `git --no-pager show -s --format='%an <%ae>' ` where `` is the SHA1 of their latest commit in the project. - Create a GitHub issue stating contact details and the severity of the issue. -Once we have acknowledged receipt of your report and confirmed the bug ourselves -we will work with you to fix the vulnerability and publicly acknowledge your -responsible disclosure, if you wish. In addition to that we will create and -publish a security advisory to +Once we have acknowledged receipt of your report and confirmed the bug +ourselves, we will work with you to fix the vulnerability and publicly +acknowledge your responsible disclosure, if you wish. In addition to that, we +will create and publish a security advisory to [GitHub Security Advisories](https://github.com/websockets/ws/security/advisories?state=published). ## History diff --git a/doc/ws.md b/doc/ws.md index dd51eca9c..1189fd02a 100644 --- a/doc/ws.md +++ b/doc/ws.md @@ -72,6 +72,12 @@ This class represents a WebSocket server. It extends the `EventEmitter`. ### new WebSocketServer(options[, callback]) - `options` {Object} + - `autoPong` {Boolean} Specifies whether or not to automatically send a pong + in response to a ping. Defaults to `true`. + - `allowSynchronousEvents` {Boolean} Specifies whether any of the `'message'`, + `'ping'`, and `'pong'` events can be emitted multiple times in the same + tick. Defaults to `true`. Setting it to `false` improves compatibility with + the WHATWG standardbut may negatively impact performance. - `backlog` {Number} The maximum length of the queue of pending connections. - `clientTracking` {Boolean} Specifies whether or not to track clients. - `handleProtocols` {Function} A function which can be used to handle the @@ -97,7 +103,7 @@ This class represents a WebSocket server. It extends the `EventEmitter`. Create a new server instance. One and only one of `port`, `server` or `noServer` must be provided or an error is thrown. An HTTP server is automatically created, started, and used if `port` is set. To use an external HTTP/S server instead, -specify only `server` or `noServer`. In this case the HTTP/S server must be +specify only `server` or `noServer`. In this case, the HTTP/S server must be started manually. The "noServer" mode allows the WebSocket server to be completely detached from the HTTP/S server. This makes it possible, for example, to share a single HTTP/S server between multiple WebSocket servers. @@ -106,8 +112,8 @@ to share a single HTTP/S server between multiple WebSocket servers. > authentication in the `'upgrade'` event of the HTTP server. See examples for > more details. -If `verifyClient` is not set then the handshake is automatically accepted. If it -has a single parameter then `ws` will invoke it with the following argument: +If `verifyClient` is not set, then the handshake is automatically accepted. If +it has a single parameter, then `ws` will invoke it with the following argument: - `info` {Object} - `origin` {String} The value in the Origin header indicated by the client. @@ -118,19 +124,19 @@ has a single parameter then `ws` will invoke it with the following argument: The return value (`Boolean`) of the function determines whether or not to accept the handshake. -If `verifyClient` has two parameters then `ws` will invoke it with the following -arguments: +If `verifyClient` has two parameters, then `ws` will invoke it with the +following arguments: - `info` {Object} Same as above. - `cb` {Function} A callback that must be called by the user upon inspection of the `info` fields. Arguments in this callback are: - `result` {Boolean} Whether or not to accept the handshake. - - `code` {Number} When `result` is `false` this field determines the HTTP + - `code` {Number} When `result` is `false`, this field determines the HTTP error status code to be sent to the client. - - `name` {String} When `result` is `false` this field determines the HTTP + - `name` {String} When `result` is `false`, this field determines the HTTP reason phrase. - - `headers` {Object} When `result` is `false` this field determines additional - HTTP headers to be sent to the client. For example, + - `headers` {Object} When `result` is `false`, this field determines + additional HTTP headers to be sent to the client. For example, `{ 'Retry-After': 120 }`. `handleProtocols` takes two arguments: @@ -140,15 +146,15 @@ arguments: - `request` {http.IncomingMessage} The client HTTP GET request. The returned value sets the value of the `Sec-WebSocket-Protocol` header in the -HTTP 101 response. If returned value is `false` the header is not added in the +HTTP 101 response. If returned value is `false`, the header is not added in the response. -If `handleProtocols` is not set then the first of the client's requested +If `handleProtocols` is not set, then the first of the client's requested subprotocols is used. `perMessageDeflate` can be used to control the behavior of [permessage-deflate extension][permessage-deflate]. The extension is disabled when `false` (default -value). If an object is provided then that is extension parameters: +value). If an object is provided, then that is extension parameters: - `serverNoContextTakeover` {Boolean} Whether to use context takeover or not. - `clientNoContextTakeover` {Boolean} Acknowledge disabling of client context @@ -165,8 +171,8 @@ value). If an object is provided then that is extension parameters: above this limit will be queued. Default 10. You usually won't need to touch this option. See [this issue][concurrency-limit] for more details. -If a property is empty then either an offered configuration or a default value -is used. When sending a fragmented message the length of the first fragment is +If a property is empty, then either an offered configuration or a default value +is used. When sending a fragmented message, the length of the first fragment is compared to the threshold. This determines if compression is used for the entire message. @@ -242,15 +248,14 @@ created internally. If an external HTTP server is used via the `server` or `noServer` constructor options, it must be closed manually. Existing connections are not closed automatically. The server emits a `'close'` event when all connections are closed unless an external HTTP server is used and client -tracking is disabled. In this case the `'close'` event is emitted in the next +tracking is disabled. In this case, the `'close'` event is emitted in the next tick. The optional callback is called when the `'close'` event occurs and receives an `Error` if the server is already closed. ### server.handleUpgrade(request, socket, head, callback) - `request` {http.IncomingMessage} The client HTTP GET request. -- `socket` {net.Socket|tls.Socket} The network socket between the server and - client. +- `socket` {stream.Duplex} The network socket between the server and client. - `head` {Buffer} The first packet of the upgraded stream. - `callback` {Function}. @@ -268,7 +273,7 @@ If the upgrade is successful, the `callback` is called with two arguments: - `request` {http.IncomingMessage} The client HTTP GET request. -See if a given request should be handled by this server. By default this method +See if a given request should be handled by this server. By default, this method validates the pathname of the request, matching it against the `path` option if provided. The return value, `true` or `false`, determines whether or not to accept the handshake. @@ -293,11 +298,19 @@ This class represents a WebSocket. It extends the `EventEmitter`. - `address` {String|url.URL} The URL to which to connect. - `protocols` {String|Array} The list of subprotocols. - `options` {Object} + - `autoPong` {Boolean} Specifies whether or not to automatically send a pong + in response to a ping. Defaults to `true`. + - `allowSynchronousEvents` {Boolean} Specifies whether any of the `'message'`, + `'ping'`, and `'pong'` events can be emitted multiple times in the same + tick. Defaults to `true`. Setting it to `false` improves compatibility with + the WHATWG standardbut may negatively impact performance. + - `finishRequest` {Function} A function which can be used to customize the + headers of each HTTP request before it is sent. See description below. - `followRedirects` {Boolean} Whether or not to follow redirects. Defaults to `false`. - `generateMask` {Function} The function used to generate the masking key. It takes a `Buffer` that must be filled synchronously and is called before a - message is sent, for each message. By default the buffer is filled with + message is sent, for each message. By default, the buffer is filled with cryptographically strong random bytes. - `handshakeTimeout` {Number} Timeout in milliseconds for the handshake request. This is reset after every redirection. @@ -316,12 +329,24 @@ This class represents a WebSocket. It extends the `EventEmitter`. Options given do not have any effect if parsed from the URL given with the `address` parameter. +Create a new WebSocket instance. + `perMessageDeflate` default value is `true`. When using an object, parameters are the same of the server. The only difference is the direction of requests. For example, `serverNoContextTakeover` can be used to ask the server to disable context takeover. -Create a new WebSocket instance. +`finishRequest` is called with arguments + +- `request` {http.ClientRequest} +- `websocket` {WebSocket} + +for each HTTP GET request (the initial one and any caused by redirects) when it +is ready to be sent, to allow for last minute customization of the headers. If +`finishRequest` is set, then it has the responsibility to call `request.end()` +once it is done setting request headers. This is intended for niche use-cases +where some headers can't be provided in advance e.g. because they depend on the +underlying socket. #### IPC connections @@ -384,13 +409,13 @@ Emitted when the connection is established. - `data` {Buffer} -Emitted when a ping is received from the server. +Emitted when a ping is received. ### Event: 'pong' - `data` {Buffer} -Emitted when a pong is received from the server. +Emitted when a pong is received. ### Event: 'redirect' @@ -454,7 +479,7 @@ The number of bytes of data that have been queued using calls to `send()` but not yet transmitted to the network. This deviates from the HTML standard in the following ways: -1. If the data is immediately sent the value is `0`. +1. If the data is immediately sent, the value is `0`. 1. All framing bytes are included. ### websocket.close([code[, reason]]) @@ -495,8 +520,8 @@ An event listener to be called when an error occurs. The listener receives an - {Function} -An event listener to be called when a message is received from the server. The -listener receives a `MessageEvent` named "message". +An event listener to be called when a message is received. The listener receives +a `MessageEvent` named "message". ### websocket.onopen @@ -585,7 +610,7 @@ state is `CONNECTING`. ### websocket.terminate() -Forcibly close the connection. Internally this calls [`socket.destroy()`][]. +Forcibly close the connection. Internally, this calls [`socket.destroy()`][]. ### websocket.url @@ -606,12 +631,12 @@ given `WebSocket`. ### WS_NO_BUFFER_UTIL -When set to a non empty value, prevents the optional `bufferutil` dependency +When set to a non-empty value, prevents the optional `bufferutil` dependency from being required. ### WS_NO_UTF_8_VALIDATE -When set to a non empty value, prevents the optional `utf-8-validate` dependency +When set to a non-empty value, prevents the optional `utf-8-validate` dependency from being required. ## Error codes diff --git a/eslint.config.js b/eslint.config.js new file mode 100644 index 000000000..4e685b9ad --- /dev/null +++ b/eslint.config.js @@ -0,0 +1,28 @@ +'use strict'; + +const pluginPrettierRecommended = require('eslint-plugin-prettier/recommended'); +const globals = require('globals'); +const js = require('@eslint/js'); + +module.exports = [ + js.configs.recommended, + { + ignores: ['.nyc_output/', '.vscode/', 'coverage/', 'node_modules/'], + languageOptions: { + ecmaVersion: 'latest', + globals: { + ...globals.browser, + ...globals.mocha, + ...globals.node + }, + sourceType: 'module' + }, + rules: { + 'no-console': 'off', + 'no-unused-vars': ['error', { caughtErrors: 'none' }], + 'no-var': 'error', + 'prefer-const': 'error' + } + }, + pluginPrettierRecommended +]; diff --git a/examples/express-session-parse/index.js b/examples/express-session-parse/index.js index b62a2e4a5..e0f214406 100644 --- a/examples/express-session-parse/index.js +++ b/examples/express-session-parse/index.js @@ -7,6 +7,10 @@ const uuid = require('uuid'); const { WebSocketServer } = require('../..'); +function onSocketError(err) { + console.error(err); +} + const app = express(); const map = new Map(); @@ -59,6 +63,8 @@ const server = http.createServer(app); const wss = new WebSocketServer({ clientTracking: false, noServer: true }); server.on('upgrade', function (request, socket, head) { + socket.on('error', onSocketError); + console.log('Parsing session from request...'); sessionParser(request, {}, () => { @@ -70,6 +76,8 @@ server.on('upgrade', function (request, socket, head) { console.log('Session is parsed!'); + socket.removeListener('error', onSocketError); + wss.handleUpgrade(request, socket, head, function (ws) { wss.emit('connection', ws, request); }); @@ -81,6 +89,8 @@ wss.on('connection', function (ws, request) { map.set(userId, ws); + ws.on('error', console.error); + ws.on('message', function (message) { // // Here we can now use session parameters. diff --git a/examples/server-stats/index.js b/examples/server-stats/index.js index e8754b5b2..afab8363f 100644 --- a/examples/server-stats/index.js +++ b/examples/server-stats/index.js @@ -22,6 +22,8 @@ wss.on('connection', function (ws) { }, 100); console.log('started client interval'); + ws.on('error', console.error); + ws.on('close', function () { console.log('stopping client interval'); clearInterval(id); diff --git a/examples/ssl.js b/examples/ssl.js index a5e750b79..83fb5f280 100644 --- a/examples/ssl.js +++ b/examples/ssl.js @@ -13,6 +13,8 @@ const server = https.createServer({ const wss = new WebSocketServer({ server }); wss.on('connection', function connection(ws) { + ws.on('error', console.error); + ws.on('message', function message(msg) { console.log(msg.toString()); }); @@ -31,6 +33,8 @@ server.listen(function listening() { rejectUnauthorized: false }); + ws.on('error', console.error); + ws.on('open', function open() { ws.send('All glory to WebSockets!'); }); diff --git a/lib/receiver.js b/lib/receiver.js index 96f572cb1..70dfd9933 100644 --- a/lib/receiver.js +++ b/lib/receiver.js @@ -13,12 +13,14 @@ const { concat, toArrayBuffer, unmask } = require('./buffer-util'); const { isValidStatusCode, isValidUTF8 } = require('./validation'); const FastBuffer = Buffer[Symbol.species]; + const GET_INFO = 0; const GET_PAYLOAD_LENGTH_16 = 1; const GET_PAYLOAD_LENGTH_64 = 2; const GET_MASK = 3; const GET_DATA = 4; const INFLATING = 5; +const DEFER_EVENT = 6; /** * HyBi Receiver implementation. @@ -30,6 +32,9 @@ class Receiver extends Writable { * Creates a Receiver instance. * * @param {Object} [options] Options object + * @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether + * any of the `'message'`, `'ping'`, and `'pong'` events can be emitted + * multiple times in the same tick * @param {String} [options.binaryType=nodebuffer] The type for binary data * @param {Object} [options.extensions] An object containing the negotiated * extensions @@ -42,6 +47,10 @@ class Receiver extends Writable { constructor(options = {}) { super(); + this._allowSynchronousEvents = + options.allowSynchronousEvents !== undefined + ? options.allowSynchronousEvents + : true; this._binaryType = options.binaryType || BINARY_TYPES[0]; this._extensions = options.extensions || {}; this._isServer = !!options.isServer; @@ -64,8 +73,9 @@ class Receiver extends Writable { this._messageLength = 0; this._fragments = []; - this._state = GET_INFO; + this._errored = false; this._loop = false; + this._state = GET_INFO; } /** @@ -137,43 +147,42 @@ class Receiver extends Writable { * @private */ startLoop(cb) { - let err; this._loop = true; do { switch (this._state) { case GET_INFO: - err = this.getInfo(); + this.getInfo(cb); break; case GET_PAYLOAD_LENGTH_16: - err = this.getPayloadLength16(); + this.getPayloadLength16(cb); break; case GET_PAYLOAD_LENGTH_64: - err = this.getPayloadLength64(); + this.getPayloadLength64(cb); break; case GET_MASK: this.getMask(); break; case GET_DATA: - err = this.getData(cb); + this.getData(cb); break; - default: - // `INFLATING` + case INFLATING: + case DEFER_EVENT: this._loop = false; return; } } while (this._loop); - cb(err); + if (!this._errored) cb(); } /** * Reads the first two bytes of a frame. * - * @return {(RangeError|undefined)} A possible error + * @param {Function} cb Callback * @private */ - getInfo() { + getInfo(cb) { if (this._bufferedBytes < 2) { this._loop = false; return; @@ -182,27 +191,31 @@ class Receiver extends Writable { const buf = this.consume(2); if ((buf[0] & 0x30) !== 0x00) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'RSV2 and RSV3 must be clear', true, 1002, 'WS_ERR_UNEXPECTED_RSV_2_3' ); + + cb(error); + return; } const compressed = (buf[0] & 0x40) === 0x40; if (compressed && !this._extensions[PerMessageDeflate.extensionName]) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'RSV1 must be clear', true, 1002, 'WS_ERR_UNEXPECTED_RSV_1' ); + + cb(error); + return; } this._fin = (buf[0] & 0x80) === 0x80; @@ -211,86 +224,100 @@ class Receiver extends Writable { if (this._opcode === 0x00) { if (compressed) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'RSV1 must be clear', true, 1002, 'WS_ERR_UNEXPECTED_RSV_1' ); + + cb(error); + return; } if (!this._fragmented) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'invalid opcode 0', true, 1002, 'WS_ERR_INVALID_OPCODE' ); + + cb(error); + return; } this._opcode = this._fragmented; } else if (this._opcode === 0x01 || this._opcode === 0x02) { if (this._fragmented) { - this._loop = false; - return error( + const error = this.createError( RangeError, `invalid opcode ${this._opcode}`, true, 1002, 'WS_ERR_INVALID_OPCODE' ); + + cb(error); + return; } this._compressed = compressed; } else if (this._opcode > 0x07 && this._opcode < 0x0b) { if (!this._fin) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'FIN must be set', true, 1002, 'WS_ERR_EXPECTED_FIN' ); + + cb(error); + return; } if (compressed) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'RSV1 must be clear', true, 1002, 'WS_ERR_UNEXPECTED_RSV_1' ); + + cb(error); + return; } if ( this._payloadLength > 0x7d || (this._opcode === 0x08 && this._payloadLength === 1) ) { - this._loop = false; - return error( + const error = this.createError( RangeError, `invalid payload length ${this._payloadLength}`, true, 1002, 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH' ); + + cb(error); + return; } } else { - this._loop = false; - return error( + const error = this.createError( RangeError, `invalid opcode ${this._opcode}`, true, 1002, 'WS_ERR_INVALID_OPCODE' ); + + cb(error); + return; } if (!this._fin && !this._fragmented) this._fragmented = this._opcode; @@ -298,54 +325,58 @@ class Receiver extends Writable { if (this._isServer) { if (!this._masked) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'MASK must be set', true, 1002, 'WS_ERR_EXPECTED_MASK' ); + + cb(error); + return; } } else if (this._masked) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'MASK must be clear', true, 1002, 'WS_ERR_UNEXPECTED_MASK' ); + + cb(error); + return; } if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16; else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64; - else return this.haveLength(); + else this.haveLength(cb); } /** * Gets extended payload length (7+16). * - * @return {(RangeError|undefined)} A possible error + * @param {Function} cb Callback * @private */ - getPayloadLength16() { + getPayloadLength16(cb) { if (this._bufferedBytes < 2) { this._loop = false; return; } this._payloadLength = this.consume(2).readUInt16BE(0); - return this.haveLength(); + this.haveLength(cb); } /** * Gets extended payload length (7+64). * - * @return {(RangeError|undefined)} A possible error + * @param {Function} cb Callback * @private */ - getPayloadLength64() { + getPayloadLength64(cb) { if (this._bufferedBytes < 8) { this._loop = false; return; @@ -359,38 +390,42 @@ class Receiver extends Writable { // if payload length is greater than this number. // if (num > Math.pow(2, 53 - 32) - 1) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'Unsupported WebSocket frame: payload length > 2^53 - 1', false, 1009, 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH' ); + + cb(error); + return; } this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4); - return this.haveLength(); + this.haveLength(cb); } /** * Payload length has been read. * - * @return {(RangeError|undefined)} A possible error + * @param {Function} cb Callback * @private */ - haveLength() { + haveLength(cb) { if (this._payloadLength && this._opcode < 0x08) { this._totalPayloadLength += this._payloadLength; if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) { - this._loop = false; - return error( + const error = this.createError( RangeError, 'Max payload size exceeded', false, 1009, 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH' ); + + cb(error); + return; } } @@ -417,7 +452,6 @@ class Receiver extends Writable { * Reads data bytes. * * @param {Function} cb Callback - * @return {(Error|RangeError|undefined)} A possible error * @private */ getData(cb) { @@ -439,7 +473,10 @@ class Receiver extends Writable { } } - if (this._opcode > 0x07) return this.controlMessage(data); + if (this._opcode > 0x07) { + this.controlMessage(data, cb); + return; + } if (this._compressed) { this._state = INFLATING; @@ -456,7 +493,7 @@ class Receiver extends Writable { this._fragments.push(data); } - return this.dataMessage(); + this.dataMessage(cb); } /** @@ -475,74 +512,96 @@ class Receiver extends Writable { if (buf.length) { this._messageLength += buf.length; if (this._messageLength > this._maxPayload && this._maxPayload > 0) { - return cb( - error( - RangeError, - 'Max payload size exceeded', - false, - 1009, - 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH' - ) + const error = this.createError( + RangeError, + 'Max payload size exceeded', + false, + 1009, + 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH' ); + + cb(error); + return; } this._fragments.push(buf); } - const er = this.dataMessage(); - if (er) return cb(er); - - this.startLoop(cb); + this.dataMessage(cb); + if (this._state === GET_INFO) this.startLoop(cb); }); } /** * Handles a data message. * - * @return {(Error|undefined)} A possible error + * @param {Function} cb Callback * @private */ - dataMessage() { - if (this._fin) { - const messageLength = this._messageLength; - const fragments = this._fragments; - - this._totalPayloadLength = 0; - this._messageLength = 0; - this._fragmented = 0; - this._fragments = []; - - if (this._opcode === 2) { - let data; - - if (this._binaryType === 'nodebuffer') { - data = concat(fragments, messageLength); - } else if (this._binaryType === 'arraybuffer') { - data = toArrayBuffer(concat(fragments, messageLength)); - } else { - data = fragments; - } + dataMessage(cb) { + if (!this._fin) { + this._state = GET_INFO; + return; + } + + const messageLength = this._messageLength; + const fragments = this._fragments; + + this._totalPayloadLength = 0; + this._messageLength = 0; + this._fragmented = 0; + this._fragments = []; + if (this._opcode === 2) { + let data; + + if (this._binaryType === 'nodebuffer') { + data = concat(fragments, messageLength); + } else if (this._binaryType === 'arraybuffer') { + data = toArrayBuffer(concat(fragments, messageLength)); + } else { + data = fragments; + } + + if (this._allowSynchronousEvents) { this.emit('message', data, true); + this._state = GET_INFO; } else { - const buf = concat(fragments, messageLength); + this._state = DEFER_EVENT; + setImmediate(() => { + this.emit('message', data, true); + this._state = GET_INFO; + this.startLoop(cb); + }); + } + } else { + const buf = concat(fragments, messageLength); - if (!this._skipUTF8Validation && !isValidUTF8(buf)) { - this._loop = false; - return error( - Error, - 'invalid UTF-8 sequence', - true, - 1007, - 'WS_ERR_INVALID_UTF8' - ); - } + if (!this._skipUTF8Validation && !isValidUTF8(buf)) { + const error = this.createError( + Error, + 'invalid UTF-8 sequence', + true, + 1007, + 'WS_ERR_INVALID_UTF8' + ); + cb(error); + return; + } + + if (this._state === INFLATING || this._allowSynchronousEvents) { this.emit('message', buf, false); + this._state = GET_INFO; + } else { + this._state = DEFER_EVENT; + setImmediate(() => { + this.emit('message', buf, false); + this._state = GET_INFO; + this.startLoop(cb); + }); } } - - this._state = GET_INFO; } /** @@ -552,24 +611,26 @@ class Receiver extends Writable { * @return {(Error|RangeError|undefined)} A possible error * @private */ - controlMessage(data) { + controlMessage(data, cb) { if (this._opcode === 0x08) { - this._loop = false; - if (data.length === 0) { + this._loop = false; this.emit('conclude', 1005, EMPTY_BUFFER); this.end(); } else { const code = data.readUInt16BE(0); if (!isValidStatusCode(code)) { - return error( + const error = this.createError( RangeError, `invalid status code ${code}`, true, 1002, 'WS_ERR_INVALID_CLOSE_CODE' ); + + cb(error); + return; } const buf = new FastBuffer( @@ -579,49 +640,65 @@ class Receiver extends Writable { ); if (!this._skipUTF8Validation && !isValidUTF8(buf)) { - return error( + const error = this.createError( Error, 'invalid UTF-8 sequence', true, 1007, 'WS_ERR_INVALID_UTF8' ); + + cb(error); + return; } + this._loop = false; this.emit('conclude', code, buf); this.end(); } - } else if (this._opcode === 0x09) { - this.emit('ping', data); + + this._state = GET_INFO; + return; + } + + if (this._allowSynchronousEvents) { + this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data); + this._state = GET_INFO; } else { - this.emit('pong', data); + this._state = DEFER_EVENT; + setImmediate(() => { + this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data); + this._state = GET_INFO; + this.startLoop(cb); + }); } + } - this._state = GET_INFO; + /** + * Builds an error object. + * + * @param {function(new:Error|RangeError)} ErrorCtor The error constructor + * @param {String} message The error message + * @param {Boolean} prefix Specifies whether or not to add a default prefix to + * `message` + * @param {Number} statusCode The status code + * @param {String} errorCode The exposed error code + * @return {(Error|RangeError)} The error + * @private + */ + createError(ErrorCtor, message, prefix, statusCode, errorCode) { + this._loop = false; + this._errored = true; + + const err = new ErrorCtor( + prefix ? `Invalid WebSocket frame: ${message}` : message + ); + + Error.captureStackTrace(err, this.createError); + err.code = errorCode; + err[kStatusCode] = statusCode; + return err; } } module.exports = Receiver; - -/** - * Builds an error object. - * - * @param {function(new:Error|RangeError)} ErrorCtor The error constructor - * @param {String} message The error message - * @param {Boolean} prefix Specifies whether or not to add a default prefix to - * `message` - * @param {Number} statusCode The status code - * @param {String} errorCode The exposed error code - * @return {(Error|RangeError)} The error - * @private - */ -function error(ErrorCtor, message, prefix, statusCode, errorCode) { - const err = new ErrorCtor( - prefix ? `Invalid WebSocket frame: ${message}` : message - ); - - Error.captureStackTrace(err, error); - err.code = errorCode; - err[kStatusCode] = statusCode; - return err; -} diff --git a/lib/sender.js b/lib/sender.js index c84885362..c81ec66f6 100644 --- a/lib/sender.js +++ b/lib/sender.js @@ -1,9 +1,8 @@ -/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls$" }] */ +/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex" }] */ 'use strict'; -const net = require('net'); -const tls = require('tls'); +const { Duplex } = require('stream'); const { randomFillSync } = require('crypto'); const PerMessageDeflate = require('./permessage-deflate'); @@ -13,6 +12,9 @@ const { mask: applyMask, toBuffer } = require('./buffer-util'); const kByteLength = Symbol('kByteLength'); const maskBuffer = Buffer.alloc(4); +const RANDOM_POOL_SIZE = 8 * 1024; +let randomPool; +let randomPoolPointer = RANDOM_POOL_SIZE; /** * HyBi Sender implementation. @@ -21,7 +23,7 @@ class Sender { /** * Creates a Sender instance. * - * @param {(net.Socket|tls.Socket)} socket The connection socket + * @param {Duplex} socket The connection socket * @param {Object} [extensions] An object containing the negotiated extensions * @param {Function} [generateMask] The function used to generate the masking * key @@ -77,7 +79,24 @@ class Sender { if (options.generateMask) { options.generateMask(mask); } else { - randomFillSync(mask, 0, 4); + if (randomPoolPointer === RANDOM_POOL_SIZE) { + /* istanbul ignore else */ + if (randomPool === undefined) { + // + // This is lazily initialized because server-sent frames must not + // be masked so it may never be used. + // + randomPool = Buffer.alloc(RANDOM_POOL_SIZE); + } + + randomFillSync(randomPool, 0, RANDOM_POOL_SIZE); + randomPoolPointer = 0; + } + + mask[0] = randomPool[randomPoolPointer++]; + mask[1] = randomPool[randomPoolPointer++]; + mask[2] = randomPool[randomPoolPointer++]; + mask[3] = randomPool[randomPoolPointer++]; } skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0; diff --git a/lib/websocket-server.js b/lib/websocket-server.js index bac30eb33..67b52ffdd 100644 --- a/lib/websocket-server.js +++ b/lib/websocket-server.js @@ -1,12 +1,10 @@ -/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls|https$" }] */ +/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex$", "caughtErrors": "none" }] */ 'use strict'; const EventEmitter = require('events'); const http = require('http'); -const https = require('https'); -const net = require('net'); -const tls = require('tls'); +const { Duplex } = require('stream'); const { createHash } = require('crypto'); const extension = require('./extension'); @@ -31,6 +29,11 @@ class WebSocketServer extends EventEmitter { * Create a `WebSocketServer` instance. * * @param {Object} options Configuration options + * @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether + * any of the `'message'`, `'ping'`, and `'pong'` events can be emitted + * multiple times in the same tick + * @param {Boolean} [options.autoPong=true] Specifies whether or not to + * automatically send a pong in response to a ping * @param {Number} [options.backlog=511] The maximum length of the queue of * pending connections * @param {Boolean} [options.clientTracking=true] Specifies whether or not to @@ -57,6 +60,8 @@ class WebSocketServer extends EventEmitter { super(); options = { + allowSynchronousEvents: true, + autoPong: true, maxPayload: 100 * 1024 * 1024, skipUTF8Validation: false, perMessageDeflate: false, @@ -221,8 +226,7 @@ class WebSocketServer extends EventEmitter { * Handle a HTTP Upgrade request. * * @param {http.IncomingMessage} req The request object - * @param {(net.Socket|tls.Socket)} socket The network socket between the - * server and client + * @param {Duplex} socket The network socket between the server and client * @param {Buffer} head The first packet of the upgraded stream * @param {Function} cb Callback * @public @@ -231,6 +235,7 @@ class WebSocketServer extends EventEmitter { socket.on('error', socketOnError); const key = req.headers['sec-websocket-key']; + const upgrade = req.headers.upgrade; const version = +req.headers['sec-websocket-version']; if (req.method !== 'GET') { @@ -239,13 +244,13 @@ class WebSocketServer extends EventEmitter { return; } - if (req.headers.upgrade.toLowerCase() !== 'websocket') { + if (upgrade === undefined || upgrade.toLowerCase() !== 'websocket') { const message = 'Invalid Upgrade header'; abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); return; } - if (!key || !keyRegex.test(key)) { + if (key === undefined || !keyRegex.test(key)) { const message = 'Missing or invalid Sec-WebSocket-Key header'; abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); return; @@ -346,8 +351,7 @@ class WebSocketServer extends EventEmitter { * @param {String} key The value of the `Sec-WebSocket-Key` header * @param {Set} protocols The subprotocols * @param {http.IncomingMessage} req The request object - * @param {(net.Socket|tls.Socket)} socket The network socket between the - * server and client + * @param {Duplex} socket The network socket between the server and client * @param {Buffer} head The first packet of the upgraded stream * @param {Function} cb Callback * @throws {Error} If called more than once with the same socket @@ -379,7 +383,7 @@ class WebSocketServer extends EventEmitter { `Sec-WebSocket-Accept: ${digest}` ]; - const ws = new this.options.WebSocket(null); + const ws = new this.options.WebSocket(null, undefined, this.options); if (protocols.size) { // @@ -413,6 +417,7 @@ class WebSocketServer extends EventEmitter { socket.removeListener('error', socketOnError); ws.setSocket(socket, head, { + allowSynchronousEvents: this.options.allowSynchronousEvents, maxPayload: this.options.maxPayload, skipUTF8Validation: this.options.skipUTF8Validation }); @@ -477,7 +482,7 @@ function socketOnError() { /** * Close the connection when preconditions are not fulfilled. * - * @param {(net.Socket|tls.Socket)} socket The socket of the upgrade request + * @param {Duplex} socket The socket of the upgrade request * @param {Number} code The HTTP response status code * @param {String} [message] The HTTP response body * @param {Object} [headers] Additional HTTP response headers @@ -518,7 +523,7 @@ function abortHandshake(socket, code, message, headers) { * * @param {WebSocketServer} server The WebSocket server * @param {http.IncomingMessage} req The request object - * @param {(net.Socket|tls.Socket)} socket The socket of the upgrade request + * @param {Duplex} socket The socket of the upgrade request * @param {Number} code The HTTP response status code * @param {String} message The HTTP response body * @private diff --git a/lib/websocket.js b/lib/websocket.js index 35a788ac4..aa57bbade 100644 --- a/lib/websocket.js +++ b/lib/websocket.js @@ -1,4 +1,4 @@ -/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Readable$" }] */ +/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex|Readable$", "caughtErrors": "none" }] */ 'use strict'; @@ -8,7 +8,7 @@ const http = require('http'); const net = require('net'); const tls = require('tls'); const { randomBytes, createHash } = require('crypto'); -const { Readable } = require('stream'); +const { Duplex, Readable } = require('stream'); const { URL } = require('url'); const PerMessageDeflate = require('./permessage-deflate'); @@ -84,6 +84,7 @@ class WebSocket extends EventEmitter { initAsClient(this, address, protocols, options); } else { + this._autoPong = options.autoPong; this._isServer = true; } } @@ -189,10 +190,12 @@ class WebSocket extends EventEmitter { /** * Set up the socket and the internal resources. * - * @param {(net.Socket|tls.Socket)} socket The network socket between the - * server and client + * @param {Duplex} socket The network socket between the server and client * @param {Buffer} head The first packet of the upgraded stream * @param {Object} options Options object + * @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether + * any of the `'message'`, `'ping'`, and `'pong'` events can be emitted + * multiple times in the same tick * @param {Function} [options.generateMask] The function used to generate the * masking key * @param {Number} [options.maxPayload=0] The maximum allowed message size @@ -202,6 +205,7 @@ class WebSocket extends EventEmitter { */ setSocket(socket, head, options) { const receiver = new Receiver({ + allowSynchronousEvents: options.allowSynchronousEvents, binaryType: this.binaryType, extensions: this._extensions, isServer: this._isServer, @@ -223,8 +227,11 @@ class WebSocket extends EventEmitter { receiver.on('ping', receiverOnPing); receiver.on('pong', receiverOnPong); - socket.setTimeout(0); - socket.setNoDelay(); + // + // These methods may not be available if `socket` is just a `Duplex`. + // + if (socket.setTimeout) socket.setTimeout(0); + if (socket.setNoDelay) socket.setNoDelay(); if (head.length > 0) socket.unshift(head); @@ -616,6 +623,13 @@ module.exports = WebSocket; * @param {(String|URL)} address The URL to which to connect * @param {Array} protocols The subprotocols * @param {Object} [options] Connection options + * @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether any + * of the `'message'`, `'ping'`, and `'pong'` events can be emitted multiple + * times in the same tick + * @param {Boolean} [options.autoPong=true] Specifies whether or not to + * automatically send a pong in response to a ping + * @param {Function} [options.finishRequest] A function which can be used to + * customize the headers of each http request before it is sent * @param {Boolean} [options.followRedirects=false] Whether or not to follow * redirects * @param {Function} [options.generateMask] The function used to generate the @@ -638,6 +652,8 @@ module.exports = WebSocket; */ function initAsClient(websocket, address, protocols, options) { const opts = { + allowSynchronousEvents: true, + autoPong: true, protocolVersion: protocolVersions[1], maxPayload: 100 * 1024 * 1024, skipUTF8Validation: false, @@ -645,7 +661,6 @@ function initAsClient(websocket, address, protocols, options) { followRedirects: false, maxRedirects: 10, ...options, - createConnection: undefined, socketPath: undefined, hostname: undefined, protocol: undefined, @@ -656,6 +671,8 @@ function initAsClient(websocket, address, protocols, options) { port: undefined }; + websocket._autoPong = opts.autoPong; + if (!protocolVersions.includes(opts.protocolVersion)) { throw new RangeError( `Unsupported protocol version: ${opts.protocolVersion} ` + @@ -667,24 +684,30 @@ function initAsClient(websocket, address, protocols, options) { if (address instanceof URL) { parsedUrl = address; - websocket._url = address.href; } else { try { parsedUrl = new URL(address); } catch (e) { throw new SyntaxError(`Invalid URL: ${address}`); } + } - websocket._url = address; + if (parsedUrl.protocol === 'http:') { + parsedUrl.protocol = 'ws:'; + } else if (parsedUrl.protocol === 'https:') { + parsedUrl.protocol = 'wss:'; } + websocket._url = parsedUrl.href; + const isSecure = parsedUrl.protocol === 'wss:'; const isIpcUrl = parsedUrl.protocol === 'ws+unix:'; let invalidUrlMessage; if (parsedUrl.protocol !== 'ws:' && !isSecure && !isIpcUrl) { invalidUrlMessage = - 'The URL\'s protocol must be one of "ws:", "wss:", or "ws+unix:"'; + 'The URL\'s protocol must be one of "ws:", "wss:", ' + + '"http:", "https", or "ws+unix:"'; } else if (isIpcUrl && !parsedUrl.pathname) { invalidUrlMessage = "The URL's pathname is empty"; } else if (parsedUrl.hash) { @@ -708,7 +731,8 @@ function initAsClient(websocket, address, protocols, options) { const protocolSet = new Set(); let perMessageDeflate; - opts.createConnection = isSecure ? tlsConnect : netConnect; + opts.createConnection = + opts.createConnection || (isSecure ? tlsConnect : netConnect); opts.defaultPort = opts.defaultPort || defaultPort; opts.port = parsedUrl.port || defaultPort; opts.host = parsedUrl.hostname.startsWith('[') @@ -798,8 +822,8 @@ function initAsClient(websocket, address, protocols, options) { ? opts.socketPath === websocket._originalHostOrSocketPath : false : websocket._originalIpc - ? false - : parsedUrl.host === websocket._originalHostOrSocketPath; + ? false + : parsedUrl.host === websocket._originalHostOrSocketPath; if (!isSameHost || (websocket._originalSecure && !isSecure)) { // @@ -904,7 +928,9 @@ function initAsClient(websocket, address, protocols, options) { req = websocket._req = null; - if (res.headers.upgrade.toLowerCase() !== 'websocket') { + const upgrade = res.headers.upgrade; + + if (upgrade === undefined || upgrade.toLowerCase() !== 'websocket') { abortHandshake(websocket, socket, 'Invalid Upgrade header'); return; } @@ -983,13 +1009,18 @@ function initAsClient(websocket, address, protocols, options) { } websocket.setSocket(socket, head, { + allowSynchronousEvents: opts.allowSynchronousEvents, generateMask: opts.generateMask, maxPayload: opts.maxPayload, skipUTF8Validation: opts.skipUTF8Validation }); }); - req.end(); + if (opts.finishRequest) { + opts.finishRequest(req, websocket); + } else { + req.end(); + } } /** @@ -1189,7 +1220,7 @@ function receiverOnMessage(data, isBinary) { function receiverOnPing(data) { const websocket = this[kWebSocket]; - websocket.pong(data, !websocket._isServer, NOOP); + if (websocket._autoPong) websocket.pong(data, !this._isServer, NOOP); websocket.emit('ping', data); } @@ -1214,7 +1245,7 @@ function resume(stream) { } /** - * The listener of the `net.Socket` `'close'` event. + * The listener of the socket `'close'` event. * * @private */ @@ -1265,7 +1296,7 @@ function socketOnClose() { } /** - * The listener of the `net.Socket` `'data'` event. + * The listener of the socket `'data'` event. * * @param {Buffer} chunk A chunk of data * @private @@ -1277,7 +1308,7 @@ function socketOnData(chunk) { } /** - * The listener of the `net.Socket` `'end'` event. + * The listener of the socket `'end'` event. * * @private */ @@ -1290,7 +1321,7 @@ function socketOnEnd() { } /** - * The listener of the `net.Socket` `'error'` event. + * The listener of the socket `'error'` event. * * @private */ diff --git a/package.json b/package.json index a810d95fc..4abcf2989 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ws", - "version": "8.12.0", + "version": "8.17.1", "description": "Simple to use, blazing fast and thoroughly tested websocket client and server for Node.js", "keywords": [ "HyBi", @@ -12,12 +12,16 @@ ], "homepage": "https://github.com/websockets/ws", "bugs": "https://github.com/websockets/ws/issues", - "repository": "websockets/ws", + "repository": { + "type": "git", + "url": "git+https://github.com/websockets/ws.git" + }, "author": "Einar Otto Stangvik (http://2x.io)", "license": "MIT", "main": "index.js", "exports": { ".": { + "browser": "./browser.js", "import": "./wrapper.mjs", "require": "./index.js" }, @@ -36,7 +40,7 @@ "scripts": { "test": "nyc --reporter=lcov --reporter=text mocha --throw-deprecation test/*.test.js", "integration": "mocha --throw-deprecation test/*.integration.js", - "lint": "eslint --ignore-path .gitignore . && prettier --check --ignore-path .gitignore \"**/*.{json,md,yaml,yml}\"" + "lint": "eslint . && prettier --check --ignore-path .gitignore \"**/*.{json,md,yaml,yml}\"" }, "peerDependencies": { "bufferutil": "^4.0.1", @@ -53,12 +57,13 @@ "devDependencies": { "benchmark": "^2.1.4", "bufferutil": "^4.0.1", - "eslint": "^8.0.0", - "eslint-config-prettier": "^8.1.0", - "eslint-plugin-prettier": "^4.0.0", + "eslint": "^9.0.0", + "eslint-config-prettier": "^9.0.0", + "eslint-plugin-prettier": "^5.0.0", + "globals": "^15.0.0", "mocha": "^8.4.0", "nyc": "^15.0.0", - "prettier": "^2.0.5", + "prettier": "^3.0.0", "utf-8-validate": "^6.0.0" } } diff --git a/test/create-websocket-stream.test.js b/test/create-websocket-stream.test.js index 4d51958cd..54a13c6c8 100644 --- a/test/create-websocket-stream.test.js +++ b/test/create-websocket-stream.test.js @@ -3,7 +3,7 @@ const assert = require('assert'); const EventEmitter = require('events'); const { createServer } = require('http'); -const { Duplex } = require('stream'); +const { Duplex, getDefaultHighWaterMark } = require('stream'); const { randomBytes } = require('crypto'); const createWebSocketStream = require('../lib/stream'); @@ -11,6 +11,10 @@ const Sender = require('../lib/sender'); const WebSocket = require('..'); const { EMPTY_BUFFER } = require('../lib/constants'); +const highWaterMark = getDefaultHighWaterMark + ? getDefaultHighWaterMark(false) + : 16 * 1024; + describe('createWebSocketStream', () => { it('is exposed as a property of the `WebSocket` class', () => { assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream); @@ -295,11 +299,14 @@ describe('createWebSocketStream', () => { ws._socket.write(Buffer.from([0x85, 0x00])); }); - assert.strictEqual(process.listenerCount('uncaughtException'), 1); + assert.strictEqual( + process.listenerCount('uncaughtException'), + EventEmitter.usingDomains ? 2 : 1 + ); - const [listener] = process.listeners('uncaughtException'); + const listener = process.listeners('uncaughtException').pop(); - process.removeAllListeners('uncaughtException'); + process.removeListener('uncaughtException', listener); process.once('uncaughtException', (err) => { assert.ok(err instanceof Error); assert.strictEqual( @@ -442,12 +449,15 @@ describe('createWebSocketStream', () => { }; const list = [ - ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }), + ...Sender.frame(randomBytes(highWaterMark), { + rsv1: false, + ...opts + }), ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts }) ]; // This hack is used because there is no guarantee that more than - // 16 KiB will be sent as a single TCP packet. + // `highWaterMark` bytes will be sent as a single TCP packet. ws._socket.push(Buffer.concat(list)); }); @@ -491,7 +501,10 @@ describe('createWebSocketStream', () => { }; const list = [ - ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }), + ...Sender.frame(randomBytes(highWaterMark), { + rsv1: false, + ...opts + }), ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts }) ]; @@ -591,7 +604,7 @@ describe('createWebSocketStream', () => { }); wss.on('connection', (ws) => { - ws.send(randomBytes(16 * 1024)); + ws.send(randomBytes(highWaterMark)); }); }); }); diff --git a/test/duplex-pair.js b/test/duplex-pair.js new file mode 100644 index 000000000..92d5e778e --- /dev/null +++ b/test/duplex-pair.js @@ -0,0 +1,73 @@ +// +// This code was copied from +// https://github.com/nodejs/node/blob/c506660f3267/test/common/duplexpair.js +// +// Copyright Node.js contributors. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to +// deal in the Software without restriction, including without limitation the +// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +// sell copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +// IN THE SOFTWARE. +// +'use strict'; + +const assert = require('assert'); +const { Duplex } = require('stream'); + +const kCallback = Symbol('Callback'); +const kOtherSide = Symbol('Other'); + +class DuplexSocket extends Duplex { + constructor() { + super(); + this[kCallback] = null; + this[kOtherSide] = null; + } + + _read() { + const callback = this[kCallback]; + if (callback) { + this[kCallback] = null; + callback(); + } + } + + _write(chunk, encoding, callback) { + assert.notStrictEqual(this[kOtherSide], null); + assert.strictEqual(this[kOtherSide][kCallback], null); + if (chunk.length === 0) { + process.nextTick(callback); + } else { + this[kOtherSide].push(chunk); + this[kOtherSide][kCallback] = callback; + } + } + + _final(callback) { + this[kOtherSide].on('end', callback); + this[kOtherSide].push(null); + } +} + +function makeDuplexPair() { + const clientSide = new DuplexSocket(); + const serverSide = new DuplexSocket(); + clientSide[kOtherSide] = serverSide; + serverSide[kOtherSide] = clientSide; + return { clientSide, serverSide }; +} + +module.exports = makeDuplexPair; diff --git a/test/receiver.test.js b/test/receiver.test.js index 4ae279469..1f9e75d3a 100644 --- a/test/receiver.test.js +++ b/test/receiver.test.js @@ -2,6 +2,7 @@ const assert = require('assert'); const crypto = require('crypto'); +const EventEmitter = require('events'); const PerMessageDeflate = require('../lib/permessage-deflate'); const Receiver = require('../lib/receiver'); @@ -1083,4 +1084,80 @@ describe('Receiver', () => { receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8])); }); + + it('honors the `allowSynchronousEvents` option', (done) => { + const actual = []; + const expected = [ + '1', + '- 1', + '-- 1', + '2', + '- 2', + '-- 2', + '3', + '- 3', + '-- 3', + '4', + '- 4', + '-- 4' + ]; + + function listener(data) { + const message = data.toString(); + actual.push(message); + + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + actual.push(`- ${message}`); + + Promise.resolve().then(() => { + actual.push(`-- ${message}`); + + if (actual.length === 12) { + assert.deepStrictEqual(actual, expected); + done(); + } + }); + }); + } + + const receiver = new Receiver({ allowSynchronousEvents: false }); + + receiver.on('message', listener); + receiver.on('ping', listener); + receiver.on('pong', listener); + + receiver.write(Buffer.from('8101318901328a0133820134', 'hex')); + }); + + it('does not swallow errors thrown from event handlers', (done) => { + const receiver = new Receiver(); + let count = 0; + + receiver.on('message', () => { + if (++count === 2) { + throw new Error('Oops'); + } + }); + + assert.strictEqual( + process.listenerCount('uncaughtException'), + EventEmitter.usingDomains ? 2 : 1 + ); + + const listener = process.listeners('uncaughtException').pop(); + + process.removeListener('uncaughtException', listener); + process.once('uncaughtException', (err) => { + assert.ok(err instanceof Error); + assert.strictEqual(err.message, 'Oops'); + + process.on('uncaughtException', listener); + done(); + }); + + setImmediate(() => { + receiver.write(Buffer.from('82008200', 'hex')); + }); + }); }); diff --git a/test/websocket-server.test.js b/test/websocket-server.test.js index abed1650a..34de4dcfa 100644 --- a/test/websocket-server.test.js +++ b/test/websocket-server.test.js @@ -11,6 +11,7 @@ const net = require('net'); const fs = require('fs'); const os = require('os'); +const makeDuplexPair = require('./duplex-pair'); const Sender = require('../lib/sender'); const WebSocket = require('..'); const { NOOP } = require('../lib/constants'); @@ -115,6 +116,30 @@ describe('WebSocketServer', () => { wss.close(done); }); }); + + it('honors the `autoPong` option', (done) => { + const wss = new WebSocket.Server({ autoPong: false, port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws.ping(); + }); + + ws.on('pong', () => { + done(new Error("Unexpected 'pong' event")); + }); + }); + + wss.on('connection', (ws) => { + ws.on('ping', () => { + ws.close(); + }); + + ws.on('close', () => { + wss.close(done); + }); + }); + }); }); it('emits an error if http server bind fails', (done) => { @@ -280,11 +305,15 @@ describe('WebSocketServer', () => { it('cleans event handlers on precreated server', (done) => { const server = http.createServer(); + const listeningListenerCount = server.listenerCount('listening'); const wss = new WebSocket.Server({ server }); server.listen(0, () => { wss.close(() => { - assert.strictEqual(server.listenerCount('listening'), 0); + assert.strictEqual( + server.listenerCount('listening'), + listeningListenerCount + ); assert.strictEqual(server.listenerCount('upgrade'), 0); assert.strictEqual(server.listenerCount('error'), 0); @@ -514,6 +543,40 @@ describe('WebSocketServer', () => { }); }); }); + + it('completes a WebSocket upgrade over any duplex stream', (done) => { + const server = http.createServer(); + + server.listen(0, () => { + const wss = new WebSocket.Server({ noServer: true }); + + server.on('upgrade', (req, socket, head) => { + // + // Put a stream between the raw socket and our websocket processing. + // + const { clientSide, serverSide } = makeDuplexPair(); + + socket.pipe(clientSide); + clientSide.pipe(socket); + + // + // Pass the other side of the stream as the socket to upgrade. + // + wss.handleUpgrade(req, serverSide, head, (ws) => { + ws.send('hello'); + ws.close(); + }); + }); + + const ws = new WebSocket(`ws://localhost:${server.address().port}`); + + ws.on('message', (message, isBinary) => { + assert.deepStrictEqual(message, Buffer.from('hello')); + assert.ok(!isBinary); + server.close(done); + }); + }); + }); }); describe('#completeUpgrade', () => { @@ -590,6 +653,50 @@ describe('WebSocketServer', () => { }); }); + it('fails if the Upgrade header field value cannot be read', (done) => { + const server = http.createServer(); + const wss = new WebSocket.Server({ noServer: true }); + + server.maxHeadersCount = 1; + + server.on('upgrade', (req, socket, head) => { + assert.deepStrictEqual(req.headers, { foo: 'bar' }); + wss.handleUpgrade(req, socket, head, () => { + done(new Error('Unexpected callback invocation')); + }); + }); + + server.listen(() => { + const req = http.get({ + port: server.address().port, + headers: { + foo: 'bar', + bar: 'baz', + Connection: 'Upgrade', + Upgrade: 'websocket' + } + }); + + req.on('response', (res) => { + assert.strictEqual(res.statusCode, 400); + + const chunks = []; + + res.on('data', (chunk) => { + chunks.push(chunk); + }); + + res.on('end', () => { + assert.strictEqual( + Buffer.concat(chunks).toString(), + 'Invalid Upgrade header' + ); + server.close(done); + }); + }); + }); + }); + it('fails if the Upgrade header field value is not "websocket"', (done) => { const wss = new WebSocket.Server({ port: 0 }, () => { const req = http.get({ @@ -1213,7 +1320,9 @@ describe('WebSocketServer', () => { it("emits the 'headers' event", (done) => { const wss = new WebSocket.Server({ port: 0 }, () => { - const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const ws = new WebSocket( + `ws://localhost:${wss.address().port}?foo=bar` + ); ws.on('open', ws.close); }); @@ -1225,7 +1334,7 @@ describe('WebSocketServer', () => { 'Connection: Upgrade' ]); assert.ok(request instanceof http.IncomingMessage); - assert.strictEqual(request.url, '/'); + assert.strictEqual(request.url, '/?foo=bar'); wss.on('connection', () => wss.close(done)); }); diff --git a/test/websocket.test.js b/test/websocket.test.js index 6b2f3ef5c..8a05f073b 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -11,6 +11,7 @@ const net = require('net'); const tls = require('tls'); const os = require('os'); const fs = require('fs'); +const { getDefaultHighWaterMark } = require('stream'); const { URL } = require('url'); const Sender = require('../lib/sender'); @@ -23,6 +24,10 @@ const { } = require('../lib/event-target'); const { EMPTY_BUFFER, GUID, kListener, NOOP } = require('../lib/constants'); +const highWaterMark = getDefaultHighWaterMark + ? getDefaultHighWaterMark(false) + : 16 * 1024; + class CustomAgent extends http.Agent { addRequest() {} } @@ -36,8 +41,16 @@ describe('WebSocket', () => { ); assert.throws( - () => new WebSocket('https://websocket-echo.com'), - /^SyntaxError: The URL's protocol must be one of "ws:", "wss:", or "ws\+unix:"$/ + () => new WebSocket('bad-scheme://websocket-echo.com'), + (err) => { + assert.strictEqual( + err.message, + 'The URL\'s protocol must be one of "ws:", "wss:", ' + + '"http:", "https", or "ws+unix:"' + ); + + return true; + } ); assert.throws( @@ -61,7 +74,7 @@ describe('WebSocket', () => { }); it('accepts `url.URL` objects as url', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); agent.addRequest = (req, opts) => { assert.strictEqual(opts.host, '::1'); @@ -72,9 +85,33 @@ describe('WebSocket', () => { const ws = new WebSocket(new URL('ws://[::1]'), { agent }); }); + it('allows the http scheme', (done) => { + const agent = new CustomAgent(); + + agent.addRequest = (req, opts) => { + assert.strictEqual(opts.host, 'localhost'); + assert.strictEqual(opts.port, 80); + done(); + }; + + const ws = new WebSocket('http://localhost', { agent }); + }); + + it('allows the https scheme', (done) => { + const agent = new https.Agent(); + + agent.addRequest = (req, opts) => { + assert.strictEqual(opts.host, 'localhost'); + assert.strictEqual(opts.port, 443); + done(); + }; + + const ws = new WebSocket('https://localhost', { agent }); + }); + describe('options', () => { it('accepts the `options` object as 3rd argument', () => { - const agent = new CustomAgent(); + const agent = new http.Agent(); let count = 0; let ws; @@ -122,10 +159,8 @@ describe('WebSocket', () => { }); it('throws an error when using an invalid `protocolVersion`', () => { - const options = { agent: new CustomAgent(), protocolVersion: 1000 }; - assert.throws( - () => new WebSocket('ws://localhost', options), + () => new WebSocket('ws://localhost', { protocolVersion: 1000 }), /^RangeError: Unsupported protocol version: 1000 \(supported versions: 8, 13\)$/ ); }); @@ -167,6 +202,30 @@ describe('WebSocket', () => { }); }); }); + + it('honors the `autoPong` option', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`, { + autoPong: false + }); + + ws.on('ping', () => { + ws.close(); + }); + + ws.on('close', () => { + wss.close(done); + }); + }); + + wss.on('connection', (ws) => { + ws.on('pong', () => { + done(new Error("Unexpected 'pong' event")); + }); + + ws.ping(); + }); + }); }); }); @@ -541,10 +600,18 @@ describe('WebSocket', () => { }); it('exposes the server url', () => { - const url = 'ws://localhost'; - const ws = new WebSocket(url, { agent: new CustomAgent() }); + const schemes = new Map([ + ['ws', 'ws'], + ['wss', 'wss'], + ['http', 'ws'], + ['https', 'wss'] + ]); + + for (const [key, value] of schemes) { + const ws = new WebSocket(`${key}://localhost/`, { lookup() {} }); - assert.strictEqual(ws.url, url); + assert.strictEqual(ws.url, `${value}://localhost/`); + } }); }); }); @@ -588,15 +655,19 @@ describe('WebSocket', () => { }); }); - it('does not re-emit `net.Socket` errors', (done) => { - const codes = ['EPIPE', 'ECONNABORTED', 'ECANCELED', 'ECONNRESET']; + it('does not re-emit `net.Socket` errors', function (done) { + // + // `socket.resetAndDestroy()` is not available in Node.js < 16.17.0. + // + if (process.versions.modules < 93) return this.skip(); + const wss = new WebSocket.Server({ port: 0 }, () => { const ws = new WebSocket(`ws://localhost:${wss.address().port}`); ws.on('open', () => { ws._socket.on('error', (err) => { assert.ok(err instanceof Error); - assert.ok(codes.includes(err.code), `Unexpected code: ${err.code}`); + assert.strictEqual(err.code, 'ECONNRESET'); ws.on('close', (code, message) => { assert.strictEqual(code, 1006); assert.strictEqual(message, EMPTY_BUFFER); @@ -604,9 +675,7 @@ describe('WebSocket', () => { }); }); - for (const client of wss.clients) client.terminate(); - ws.send('foo'); - ws.send('bar'); + wss.clients.values().next().value._socket.resetAndDestroy(); }); }); }); @@ -688,6 +757,32 @@ describe('WebSocket', () => { beforeEach((done) => server.listen(0, done)); afterEach((done) => server.close(done)); + it('fails if the Upgrade header field value cannot be read', (done) => { + server.once('upgrade', (req, socket) => { + socket.on('end', socket.end); + socket.write( + 'HTTP/1.1 101 Switching Protocols\r\n' + + 'Connection: Upgrade\r\n' + + 'Upgrade: websocket\r\n' + + '\r\n' + ); + }); + + const ws = new WebSocket(`ws://localhost:${server.address().port}`); + + ws._req.maxHeadersCount = 1; + + ws.on('upgrade', (res) => { + assert.deepStrictEqual(res.headers, { connection: 'Upgrade' }); + + ws.on('error', (err) => { + assert.ok(err instanceof Error); + assert.strictEqual(err.message, 'Invalid Upgrade header'); + done(); + }); + }); + }); + it('fails if the Upgrade header field value is not "websocket"', (done) => { server.once('upgrade', (req, socket) => { socket.on('end', socket.end); @@ -1089,6 +1184,35 @@ describe('WebSocket', () => { }); }); + it('honors the `createConnection` option', (done) => { + const wss = new WebSocket.Server({ noServer: true, path: '/foo' }); + + server.once('upgrade', (req, socket, head) => { + assert.strictEqual(req.headers.host, 'google.com:22'); + wss.handleUpgrade(req, socket, head, NOOP); + }); + + const ws = new WebSocket('ws://google.com:22/foo', { + createConnection: (options) => { + assert.strictEqual(options.host, 'google.com'); + assert.strictEqual(options.port, '22'); + + // Ignore the `options` argument, and use the correct hostname and + // port to connect to the server. + return net.createConnection({ + host: 'localhost', + port: server.address().port + }); + } + }); + + ws.on('open', () => { + assert.strictEqual(ws.url, 'ws://google.com:22/foo'); + ws.on('close', () => done()); + ws.close(); + }); + }); + it('does not follow redirects by default', (done) => { server.once('upgrade', (req, socket) => { socket.end( @@ -1176,7 +1300,9 @@ describe('WebSocket', () => { it('emits an error if the redirect URL is invalid (2/2)', (done) => { server.once('upgrade', (req, socket) => { - socket.end('HTTP/1.1 302 Found\r\nLocation: http://localhost\r\n\r\n'); + socket.end( + 'HTTP/1.1 302 Found\r\nLocation: bad-scheme://localhost\r\n\r\n' + ); }); const ws = new WebSocket(`ws://localhost:${server.address().port}`, { @@ -1188,7 +1314,8 @@ describe('WebSocket', () => { assert.ok(err instanceof SyntaxError); assert.strictEqual( err.message, - 'The URL\'s protocol must be one of "ws:", "wss:", or "ws+unix:"' + 'The URL\'s protocol must be one of "ws:", "wss:", ' + + '"http:", "https", or "ws+unix:"' ); assert.strictEqual(ws._redirects, 1); @@ -1901,38 +2028,6 @@ describe('WebSocket', () => { }); }); - describe('Connection with query string', () => { - it('connects when pathname is not null', (done) => { - const wss = new WebSocket.Server({ port: 0 }, () => { - const port = wss.address().port; - const ws = new WebSocket(`ws://localhost:${port}/?token=qwerty`); - - ws.on('open', () => { - wss.close(done); - }); - }); - - wss.on('connection', (ws) => { - ws.close(); - }); - }); - - it('connects when pathname is null', (done) => { - const wss = new WebSocket.Server({ port: 0 }, () => { - const port = wss.address().port; - const ws = new WebSocket(`ws://localhost:${port}?token=qwerty`); - - ws.on('open', () => { - wss.close(done); - }); - }); - - wss.on('connection', (ws) => { - ws.close(); - }); - }); - }); - describe('#pause', () => { it('does nothing if `readyState` is `CONNECTING` or `CLOSED`', (done) => { const wss = new WebSocket.Server({ port: 0 }, () => { @@ -2314,6 +2409,29 @@ describe('WebSocket', () => { ws.close(); }); }); + + it('is called automatically when a ping is received', (done) => { + const buf = Buffer.from('hi'); + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + ws.on('open', () => { + ws.ping(buf); + }); + + ws.on('pong', (data) => { + assert.deepStrictEqual(data, buf); + wss.close(done); + }); + }); + + wss.on('connection', (ws) => { + ws.on('ping', (data) => { + assert.deepStrictEqual(data, buf); + ws.close(); + }); + }); + }); }); describe('#resume', () => { @@ -2458,7 +2576,7 @@ describe('WebSocket', () => { it('can send a big binary message', (done) => { const wss = new WebSocket.Server({ port: 0 }, () => { - const array = new Float32Array(5 * 1024 * 1024); + const array = new Float32Array(1024 * 1024); for (let i = 0; i < array.length; i++) { array[i] = i / 5; @@ -2804,16 +2922,28 @@ describe('WebSocket', () => { }); it('can be called from an error listener while connecting', (done) => { - const ws = new WebSocket('ws://localhost:1337'); + const server = net.createServer(); - ws.on('open', () => done(new Error("Unexpected 'open' event"))); - ws.on('error', (err) => { - assert.ok(err instanceof Error); - assert.strictEqual(err.code, 'ECONNREFUSED'); - ws.close(); - ws.on('close', () => done()); + server.on('connection', (socket) => { + socket.on('end', socket.end); + socket.resume(); + socket.write(Buffer.from('foo\r\n')); }); - }).timeout(4000); + + server.listen(0, () => { + const ws = new WebSocket(`ws://localhost:${server.address().port}`); + + ws.on('open', () => done(new Error("Unexpected 'open' event"))); + ws.on('error', (err) => { + assert.ok(err instanceof Error); + assert.strictEqual(err.code, 'HPE_INVALID_CONSTANT'); + ws.close(); + ws.on('close', () => { + server.close(done); + }); + }); + }); + }); it("can be called from a listener of the 'redirect' event", (done) => { const server = http.createServer(); @@ -3078,16 +3208,28 @@ describe('WebSocket', () => { }); it('can be called from an error listener while connecting', (done) => { - const ws = new WebSocket('ws://localhost:1337'); + const server = net.createServer(); - ws.on('open', () => done(new Error("Unexpected 'open' event"))); - ws.on('error', (err) => { - assert.ok(err instanceof Error); - assert.strictEqual(err.code, 'ECONNREFUSED'); - ws.terminate(); - ws.on('close', () => done()); + server.on('connection', (socket) => { + socket.on('end', socket.end); + socket.resume(); + socket.write(Buffer.from('foo\r\n')); }); - }).timeout(4000); + + server.listen(0, () => { + const ws = new WebSocket(`ws://localhost:${server.address().port}`); + + ws.on('open', () => done(new Error("Unexpected 'open' event"))); + ws.on('error', (err) => { + assert.ok(err instanceof Error); + assert.strictEqual(err.code, 'HPE_INVALID_CONSTANT'); + ws.terminate(); + ws.on('close', () => { + server.close(done); + }); + }); + }); + }); it("can be called from a listener of the 'redirect' event", (done) => { const server = http.createServer(); @@ -3741,7 +3883,7 @@ describe('WebSocket', () => { describe('Request headers', () => { it('adds the authorization header if the url has userinfo', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); const userinfo = 'test:testpass'; agent.addRequest = (req) => { @@ -3756,7 +3898,7 @@ describe('WebSocket', () => { }); it('honors the `auth` option', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); const auth = 'user:pass'; agent.addRequest = (req) => { @@ -3771,7 +3913,7 @@ describe('WebSocket', () => { }); it('favors the url userinfo over the `auth` option', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); const auth = 'foo:bar'; const userinfo = 'baz:qux'; @@ -3787,7 +3929,7 @@ describe('WebSocket', () => { }); it('adds custom headers', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); agent.addRequest = (req) => { assert.strictEqual(req.getHeader('cookie'), 'foo=bar'); @@ -3816,7 +3958,7 @@ describe('WebSocket', () => { }); it("doesn't add the origin header by default", (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); agent.addRequest = (req) => { assert.strictEqual(req.getHeader('origin'), undefined); @@ -3827,7 +3969,7 @@ describe('WebSocket', () => { }); it('honors the `origin` option (1/2)', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); agent.addRequest = (req) => { assert.strictEqual(req.getHeader('origin'), 'https://example.com:8000'); @@ -3841,7 +3983,7 @@ describe('WebSocket', () => { }); it('honors the `origin` option (2/2)', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); agent.addRequest = (req) => { assert.strictEqual( @@ -3857,11 +3999,42 @@ describe('WebSocket', () => { agent }); }); + + it('honors the `finishRequest` option', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const host = `localhost:${wss.address().port}`; + const ws = new WebSocket(`ws://${host}`, { + finishRequest(req, ws) { + assert.ok(req instanceof http.ClientRequest); + assert.strictEqual(req.getHeader('host'), host); + assert.ok(ws instanceof WebSocket); + assert.strictEqual(req, ws._req); + + req.on('socket', (socket) => { + socket.on('connect', () => { + req.setHeader('Cookie', 'foo=bar'); + req.end(); + }); + }); + } + }); + + ws.on('close', (code) => { + assert.strictEqual(code, 1005); + wss.close(done); + }); + }); + + wss.on('connection', (ws, req) => { + assert.strictEqual(req.headers.cookie, 'foo=bar'); + ws.close(); + }); + }); }); describe('permessage-deflate', () => { it('is enabled by default', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); agent.addRequest = (req) => { assert.strictEqual( @@ -3875,7 +4048,7 @@ describe('WebSocket', () => { }); it('can be disabled', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); agent.addRequest = (req) => { assert.strictEqual( @@ -3892,7 +4065,7 @@ describe('WebSocket', () => { }); it('can send extension parameters', (done) => { - const agent = new CustomAgent(); + const agent = new http.Agent(); const value = 'permessage-deflate; server_no_context_takeover;' + @@ -3979,7 +4152,7 @@ describe('WebSocket', () => { ws.terminate(); }; - const payload1 = Buffer.alloc(15 * 1024); + const payload1 = Buffer.alloc(highWaterMark - 1024); const payload2 = Buffer.alloc(1); const opts = { @@ -3994,13 +4167,17 @@ describe('WebSocket', () => { ...Sender.frame(payload2, { rsv1: true, ...opts }) ]; - for (let i = 0; i < 399; i++) { + for (let i = 0; i < 340; i++) { list.push(list[list.length - 2], list[list.length - 1]); } + const data = Buffer.concat(list); + + assert.ok(data.length > highWaterMark); + // This hack is used because there is no guarantee that more than - // 16 KiB will be sent as a single TCP packet. - push.call(ws._socket, Buffer.concat(list)); + // `highWaterMark` bytes will be sent as a single TCP packet. + push.call(ws._socket, data); wss.clients .values() @@ -4015,8 +4192,8 @@ describe('WebSocket', () => { ws.on('close', (code) => { assert.strictEqual(code, 1006); - assert.strictEqual(messageLengths.length, 402); - assert.strictEqual(messageLengths[0], 15360); + assert.strictEqual(messageLengths.length, 343); + assert.strictEqual(messageLengths[0], highWaterMark - 1024); assert.strictEqual(messageLengths[messageLengths.length - 1], 1); wss.close(done); }); @@ -4078,18 +4255,17 @@ describe('WebSocket', () => { const messages = []; const ws = new WebSocket(`ws://localhost:${wss.address().port}`); - ws.on('open', () => { - ws._socket.on('end', () => { - assert.strictEqual(ws._receiver._state, 5); - }); - }); - ws.on('message', (message, isBinary) => { assert.ok(!isBinary); if (messages.push(message.toString()) > 1) return; - ws.close(1000); + setImmediate(() => { + process.nextTick(() => { + assert.strictEqual(ws._receiver._state, 5); + ws.close(1000); + }); + }); }); ws.on('close', (code, reason) => { @@ -4273,6 +4449,7 @@ describe('WebSocket', () => { 'The socket was closed while data was being compressed' ); }); + ws.close(); }); } ); @@ -4334,9 +4511,11 @@ describe('WebSocket', () => { if (messages.push(message.toString()) > 1) return; - process.nextTick(() => { - assert.strictEqual(ws._receiver._state, 5); - ws.terminate(); + setImmediate(() => { + process.nextTick(() => { + assert.strictEqual(ws._receiver._state, 5); + ws.terminate(); + }); }); });