Skip to content

Commit

Permalink
feat(unstable): Add Deno.upgradeHttp API (denoland#13618)
Browse files Browse the repository at this point in the history
This commit adds "Deno.upgradeHttp" API, which
allows to "hijack" connection and switch protocols, to eg.
implement WebSocket required for Node compat.

Co-authored-by: crowlkats <[email protected]>
Co-authored-by: Ryan Dahl <[email protected]>
Co-authored-by: Bartek Iwańczuk <[email protected]>
  • Loading branch information
4 people committed Mar 16, 2022
1 parent 89a41d0 commit c5270ab
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 9 deletions.
14 changes: 14 additions & 0 deletions cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,20 @@ declare namespace Deno {
* Make the timer of the given id not blocking the event loop from finishing
*/
export function unrefTimer(id: number): void;

/** **UNSTABLE**: new API, yet to be vetter.
*
* Allows to "hijack" a connection that the request is associated with.
* Can be used to implement protocols that build on top of HTTP (eg.
* WebSockets).
*
* The returned promise returns underlying connection and first packet
* received. The promise shouldn't be awaited before responding to the
* `request`, otherwise event loop might deadlock.
*/
export function upgradeHttp(
request: Request,
): Promise<[Deno.Conn, Uint8Array]>;
}

declare function fetch(
Expand Down
44 changes: 44 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
BufWriter,
} from "../../../test_util/std/io/buffer.ts";
import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts";
import { serve } from "../../../test_util/std/http/server.ts";
import {
assert,
assertEquals,
Expand Down Expand Up @@ -1738,6 +1739,49 @@ Deno.test({
},
});

Deno.test("upgradeHttp", async () => {
async function client() {
const tcpConn = await Deno.connect({ port: 4501 });
await tcpConn.write(
new TextEncoder().encode(
"CONNECT server.example.com:80 HTTP/1.1\r\n\r\nbla bla bla\nbla bla\nbla\n",
),
);
setTimeout(async () => {
await tcpConn.write(
new TextEncoder().encode(
"bla bla bla\nbla bla\nbla\n",
),
);
tcpConn.close();
}, 500);
}

const abortController = new AbortController();
const signal = abortController.signal;

const server = serve((req) => {
const p = Deno.upgradeHttp(req);

(async () => {
const [conn, firstPacket] = await p;
const buf = new Uint8Array(1024);
const firstPacketText = new TextDecoder().decode(firstPacket);
assertEquals(firstPacketText, "bla bla bla\nbla bla\nbla\n");
const n = await conn.read(buf);
assert(n != null);
const secondPacketText = new TextDecoder().decode(buf.slice(0, n));
assertEquals(secondPacketText, "bla bla bla\nbla bla\nbla\n");
abortController.abort();
conn.close();
})();

return new Response(null, { status: 101 });
}, { port: 4501, signal });

await Promise.all([server, client()]);
});

function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
Expand Down
47 changes: 44 additions & 3 deletions ext/http/01_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@
_idleTimeoutTimeout,
_serverHandleIdleTimeout,
} = window.__bootstrap.webSocket;
const { TcpConn } = window.__bootstrap.net;
const { TlsConn } = window.__bootstrap.tls;
const { Deferred } = window.__bootstrap.streams;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
ArrayPrototypeSome,
Error,
ObjectPrototypeIsPrototypeOf,
PromisePrototype,
Set,
Expand All @@ -53,19 +57,24 @@
} = window.__bootstrap.primordials;

const connErrorSymbol = Symbol("connError");
const _deferred = Symbol("upgradeHttpDeferred");

class HttpConn {
#rid = 0;
#closed = false;
#remoteAddr;
#localAddr;

// This set holds resource ids of resources
// that were created during lifecycle of this request.
// When the connection is closed these resources should be closed
// as well.
managedResources = new Set();

constructor(rid) {
constructor(rid, remoteAddr, localAddr) {
this.#rid = rid;
this.#remoteAddr = remoteAddr;
this.#localAddr = localAddr;
}

/** @returns {number} */
Expand Down Expand Up @@ -125,7 +134,13 @@
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");

const respondWith = createRespondWith(this, streamRid);
const respondWith = createRespondWith(
this,
streamRid,
request,
this.#remoteAddr,
this.#localAddr,
);

return { request, respondWith };
}
Expand Down Expand Up @@ -159,7 +174,13 @@
return core.opAsync("op_http_read", streamRid, buf);
}

function createRespondWith(httpConn, streamRid) {
function createRespondWith(
httpConn,
streamRid,
request,
remoteAddr,
localAddr,
) {
return async function respondWith(resp) {
try {
if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) {
Expand Down Expand Up @@ -282,6 +303,20 @@
}
}

const deferred = request[_deferred];
if (deferred) {
const res = await core.opAsync("op_http_upgrade", streamRid);
let conn;
if (res.connType === "tcp") {
conn = new TcpConn(res.connRid, remoteAddr, localAddr);
} else if (res.connType === "tls") {
conn = new TlsConn(res.connRid, remoteAddr, localAddr);
} else {
throw new Error("unreachable");
}

deferred.resolve([conn, res.readBuf]);
}
const ws = resp[_ws];
if (ws) {
const wsRid = await core.opAsync(
Expand Down Expand Up @@ -425,8 +460,14 @@
return { response, socket };
}

function upgradeHttp(req) {
req[_deferred] = new Deferred();
return req[_deferred].promise;
}

window.__bootstrap.http = {
HttpConn,
upgradeWebSocket,
upgradeHttp,
};
})(this);
6 changes: 3 additions & 3 deletions ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ impl HttpAcceptor {
}

/// A resource representing a single HTTP request/response stream.
struct HttpStreamResource {
pub struct HttpStreamResource {
conn: Rc<HttpConnResource>,
rd: AsyncRefCell<HttpRequestReader>,
pub rd: AsyncRefCell<HttpRequestReader>,
wr: AsyncRefCell<HttpResponseWriter>,
accept_encoding: RefCell<Encoding>,
cancel_handle: CancelHandle,
Expand Down Expand Up @@ -324,7 +324,7 @@ impl Resource for HttpStreamResource {
}

/// The read half of an HTTP stream.
enum HttpRequestReader {
pub enum HttpRequestReader {
Headers(Request<Body>),
Body(Peekable<Body>),
Closed,
Expand Down
2 changes: 1 addition & 1 deletion ext/net/ops_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl TlsStream {
Self::new(tcp, Connection::Server(tls))
}

fn into_split(self) -> (ReadHalf, WriteHalf) {
pub fn into_split(self) -> (ReadHalf, WriteHalf) {
let shared = Shared::new(self);
let rd = ReadHalf {
shared: shared.clone(),
Expand Down
2 changes: 1 addition & 1 deletion runtime/js/40_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

function serveHttp(conn) {
const rid = core.opSync("op_http_start", conn.rid);
return new HttpConn(rid);
return new HttpConn(rid, conn.remoteAddr, conn.localAddr);
}

window.__bootstrap.http.serveHttp = serveHttp;
Expand Down
1 change: 1 addition & 0 deletions runtime/js/90_deno_ns.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
serveHttp: __bootstrap.http.serveHttp,
resolveDns: __bootstrap.net.resolveDns,
upgradeWebSocket: __bootstrap.http.upgradeWebSocket,
upgradeHttp: __bootstrap.http.upgradeHttp,
kill: __bootstrap.process.kill,
addSignalListener: __bootstrap.signals.addSignalListener,
removeSignalListener: __bootstrap.signals.removeSignalListener,
Expand Down
80 changes: 79 additions & 1 deletion runtime/ops/http.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
use std::cell::RefCell;
use std::rc::Rc;

use deno_core::error::bad_resource_id;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use deno_http::http_create_conn_resource;
use deno_http::HttpRequestReader;
use deno_http::HttpStreamResource;
use deno_net::io::TcpStreamResource;
use deno_net::ops_tls::TlsStream;
use deno_net::ops_tls::TlsStreamResource;
use hyper::upgrade::Parts;
use serde::Serialize;
use tokio::net::TcpStream;

pub fn init() -> Extension {
Extension::builder()
.ops(vec![op_http_start::decl()])
.ops(vec![op_http_start::decl(), op_http_upgrade::decl()])
.build()
}

Expand Down Expand Up @@ -62,3 +72,71 @@ fn op_http_start(

Err(bad_resource_id())
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpUpgradeResult {
conn_rid: ResourceId,
conn_type: &'static str,
read_buf: ZeroCopyBuf,
}

#[op]
async fn op_http_upgrade(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_: (),
) -> Result<HttpUpgradeResult, AnyError> {
let stream = state
.borrow_mut()
.resource_table
.get::<HttpStreamResource>(rid)?;
let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;

let request = match &mut *rd {
HttpRequestReader::Headers(request) => request,
_ => {
return Err(custom_error(
"Http",
"cannot upgrade because request body was used",
))
}
};

let transport = hyper::upgrade::on(request).await?;
let transport = match transport.downcast::<TcpStream>() {
Ok(Parts {
io: tcp_stream,
read_buf,
..
}) => {
return Ok(HttpUpgradeResult {
conn_type: "tcp",
conn_rid: state
.borrow_mut()
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split())),
read_buf: read_buf.to_vec().into(),
});
}
Err(transport) => transport,
};
match transport.downcast::<TlsStream>() {
Ok(Parts {
io: tls_stream,
read_buf,
..
}) => Ok(HttpUpgradeResult {
conn_type: "tls",
conn_rid: state
.borrow_mut()
.resource_table
.add(TlsStreamResource::new(tls_stream.into_split())),
read_buf: read_buf.to_vec().into(),
}),
Err(_) => Err(custom_error(
"Http",
"encountered unsupported transport while upgrading",
)),
}
}

0 comments on commit c5270ab

Please sign in to comment.