Skip to content

Commit

Permalink
fix(ext/websocket): correctly order messages when sending blobs (deno…
Browse files Browse the repository at this point in the history
…land#24133)

Previously the asynchronous read of the blob would not block sends that
are started later. We now do this, but in such a way as to not regress
performance in the common case of not using `Blob`.
  • Loading branch information
lucacasonato committed Jun 8, 2024
1 parent c1f23c5 commit 585ba28
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 16 deletions.
75 changes: 59 additions & 16 deletions ext/websocket/01_websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const {
ArrayBufferIsView,
ArrayPrototypeJoin,
ArrayPrototypeMap,
ArrayPrototypePush,
ArrayPrototypeShift,
ArrayPrototypeSome,
ErrorPrototypeToString,
ObjectDefineProperties,
Expand All @@ -41,7 +43,6 @@ const {
SymbolFor,
SymbolIterator,
TypedArrayPrototypeGetByteLength,
Uint8Array,
} = primordials;

import { URL } from "ext:deno_url/00_url.js";
Expand Down Expand Up @@ -111,11 +112,14 @@ const _extensions = Symbol("[[extensions]]");
const _protocol = Symbol("[[protocol]]");
const _binaryType = Symbol("[[binaryType]]");
const _eventLoop = Symbol("[[eventLoop]]");
const _sendQueue = Symbol("[[sendQueue]]");
const _queueSend = Symbol("[[queueSend]]");

const _server = Symbol("[[server]]");
const _idleTimeoutDuration = Symbol("[[idleTimeout]]");
const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]");
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");

class WebSocket extends EventTarget {
constructor(url, protocols = []) {
super();
Expand All @@ -129,6 +133,8 @@ class WebSocket extends EventTarget {
this[_binaryType] = "blob";
this[_idleTimeoutDuration] = 0;
this[_idleTimeoutTimeout] = undefined;
this[_sendQueue] = [];

const prefix = "Failed to construct 'WebSocket'";
webidl.requiredArguments(arguments.length, 1, prefix);
url = webidl.converters.USVString(url, prefix, "Argument 1");
Expand Down Expand Up @@ -326,22 +332,26 @@ class WebSocket extends EventTarget {
throw new DOMException("readyState not OPEN", "InvalidStateError");
}

if (ArrayBufferIsView(data)) {
op_ws_send_binary(this[_rid], data);
} else if (isArrayBuffer(data)) {
op_ws_send_binary(this[_rid], new Uint8Array(data));
} else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
PromisePrototypeThen(
// deno-lint-ignore prefer-primordials
data.slice().arrayBuffer(),
(ab) => op_ws_send_binary_ab(this[_rid], ab),
);
if (this[_sendQueue].length === 0) {
// Fast path if the send queue is empty, for example when only synchronous
// data is being sent.
if (ArrayBufferIsView(data)) {
op_ws_send_binary(this[_rid], data);
} else if (isArrayBuffer(data)) {
op_ws_send_binary_ab(this[_rid], data);
} else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
this[_queueSend](data);
} else {
const string = String(data);
op_ws_send_text(
this[_rid],
string,
);
}
} else {
const string = String(data);
op_ws_send_text(
this[_rid],
string,
);
// Slower path if the send queue is not empty, for example when sending
// asynchronous data like a Blob.
this[_queueSend](data);
}
}

Expand Down Expand Up @@ -508,6 +518,38 @@ class WebSocket extends EventTarget {
}
}

async [_queueSend](data) {
const queue = this[_sendQueue];

ArrayPrototypePush(queue, data);

if (queue.length > 1) {
// There is already a send in progress, so we just push to the queue
// and let that task handle sending of this data.
return;
}

while (queue.length > 0) {
const data = queue[0];
if (ArrayBufferIsView(data)) {
op_ws_send_binary(this[_rid], data);
} else if (isArrayBuffer(data)) {
op_ws_send_binary_ab(this[_rid], data);
} else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
// deno-lint-ignore prefer-primordials
const ab = await data.slice().arrayBuffer();
op_ws_send_binary_ab(this[_rid], ab);
} else {
const string = String(data);
op_ws_send_text(
this[_rid],
string,
);
}
ArrayPrototypeShift(queue);
}
}

[_serverHandleIdleTimeout]() {
if (this[_idleTimeoutDuration]) {
clearTimeout(this[_idleTimeoutTimeout]);
Expand Down Expand Up @@ -608,6 +650,7 @@ function createWebSocketBranded() {
socket[_binaryType] = "arraybuffer";
socket[_idleTimeoutDuration] = 0;
socket[_idleTimeoutTimeout] = undefined;
socket[_sendQueue] = [];
return socket;
}

Expand Down
25 changes: 25 additions & 0 deletions tests/unit/websocket_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,31 @@ Deno.test("echo arraybuffer with binaryType arraybuffer", async () => {
await promise;
});

Deno.test("echo blob mixed with string", async () => {
const { promise, resolve } = Promise.withResolvers<void>();
const ws = new WebSocket("ws:https://localhost:4242");
ws.binaryType = "arraybuffer";
const blob = new Blob(["foo"]);
ws.onerror = () => fail();
ws.onopen = () => {
ws.send(blob);
ws.send("bar");
};
const messages: (ArrayBuffer | string)[] = [];
ws.onmessage = (e) => {
messages.push(e.data);
if (messages.length === 2) {
assertEquals(messages[0], new Uint8Array([102, 111, 111]).buffer);
assertEquals(messages[1], "bar");
ws.close();
}
};
ws.onclose = () => {
resolve();
};
await promise;
});

Deno.test("Event Handlers order", async () => {
const { promise, resolve } = Promise.withResolvers<void>();
const ws = new WebSocket("ws:https://localhost:4242");
Expand Down

0 comments on commit 585ba28

Please sign in to comment.