Skip to content

Commit

Permalink
fix(ext/http): flush chunk when streaming resource
Browse files Browse the repository at this point in the history
When streaming a resource in ext/http, with compression enabled, we
didn't flush individual chunks. This became very problematic when we
enabled `req.body` from `fetch` for FastStream recently.

This commit now correctly flushes each resource chunk after compression.
  • Loading branch information
lucacasonato committed Nov 4, 2022
1 parent 61fbfab commit bc1743f
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 1 deletion.
77 changes: 77 additions & 0 deletions cli/tests/unit/http_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2515,6 +2515,83 @@ Deno.test(
},
);

Deno.test({
name: "http server compresses and flushes each chunk of a streamed resource",
permissions: { net: true, run: true },
async fn() {
const hostname = "localhost";
const port = 4501;
const port2 = 4502;

const encoder = new TextEncoder();
const listener = Deno.listen({ hostname, port });
const listener2 = Deno.listen({ hostname, port: port2 });

let httpConn: Deno.HttpConn;
async function server() {
const tcpConn = await listener.accept();
httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest();
assert(e);
const { request, respondWith } = e;
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
const resp = await fetch(`http:https://${hostname}:${port2}/`);
await respondWith(resp);
listener.close();
}

const ts = new TransformStream();
const writer = ts.writable.getWriter();
writer.write(encoder.encode("hello"));

let httpConn2: Deno.HttpConn;
async function server2() {
const tcpConn = await listener2.accept();
httpConn2 = Deno.serveHttp(tcpConn);
const e = await httpConn2.nextRequest();
assert(e);
await e.respondWith(
new Response(ts.readable, {
headers: { "Content-Type": "text/plain" },
}),
);
listener2.close();
}

async function client() {
const url = `http:https://${hostname}:${port}/`;
const args = [
"--request",
"GET",
"--url",
url,
"--header",
"Accept-Encoding: gzip, deflate, br",
"--no-buffer",
];
const proc = Deno.spawnChild("curl", { args, stderr: "null" });
const stdout = proc.stdout
.pipeThrough(new DecompressionStream("gzip"))
.pipeThrough(new TextDecoderStream());
let body = "";
for await (const chunk of stdout) {
body += chunk;
if (body === "hello") {
writer.write(encoder.encode(" world"));
writer.close();
}
}
assertEquals(body, "hello world");
const status = await proc.status;
assert(status.success);
}

await Promise.all([server(), server2(), client()]);
httpConn!.close();
httpConn2!.close();
},
});

function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
Expand Down
6 changes: 5 additions & 1 deletion ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,11 @@ async fn op_http_write_resource(

match &mut *wr {
HttpResponseWriter::Body(body) => {
if let Err(err) = body.write_all(&view).await {
let mut result = body.write_all(&view).await;
if result.is_ok() {
result = body.flush().await;
}
if let Err(err) = result {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
Expand Down

0 comments on commit bc1743f

Please sign in to comment.