diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index fd1203f53b61e8..3c25265d74378e 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -394,7 +394,7 @@ impl PollFrame for GZipResponseStream { stm.compress(&[], &mut buf, flate2::FlushCompress::Finish) } ResponseStreamResult::NonEmptyBuf(mut input) => { - let res = stm.compress(&input, &mut buf, flate2::FlushCompress::None); + let res = stm.compress(&input, &mut buf, flate2::FlushCompress::Sync); let len_in = (stm.total_in() - start_in) as usize; debug_assert!(len_in <= input.len()); this.crc.update(&input[..len_in]); diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 6b862ce837793e..ec3fe6e0b4f67f 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -671,6 +671,9 @@ class ClientRequest extends OutgoingMessage { (async () => { try { const res = await op_fetch_send(this._req.requestRid); + if (this._req.cancelHandleRid !== null) { + core.tryClose(this._req.cancelHandleRid); + } try { cb?.(); } catch (_) { @@ -709,10 +712,6 @@ class ClientRequest extends OutgoingMessage { Object.entries(res.headers).flat().length, ); - if (this._req.cancelHandleRid !== null) { - core.tryClose(this._req.cancelHandleRid); - } - if (incoming.upgrade) { if (this.listenerCount("upgrade") === 0) { // No listeners, so we got nothing to do diff --git a/tests/unit/serve_test.ts b/tests/unit/serve_test.ts index f7f01076bef042..ff77578e613a76 100644 --- a/tests/unit/serve_test.ts +++ b/tests/unit/serve_test.ts @@ -11,6 +11,7 @@ import { curlRequest, curlRequestWithStdErr, execCode, + execCode3, fail, tmpUnixSocketPath, } from "./test_util.ts"; @@ -3985,3 +3986,59 @@ Deno.test( assert(respText === "Internal Server Error"); }, ); + +Deno.test( + { + permissions: { net: true, run: true, read: true }, + ignore: Deno.build.os !== "linux", + }, + async function gzipFlushResponseStream() { + const { promise, resolve } = Promise.withResolvers(); + const ac = new AbortController(); + + console.log("Starting server", servePort); + let timer: number | undefined = undefined; + let _controller; + + const server = Deno.serve( + { + port: servePort, + onListen: onListen(resolve), + signal: ac.signal, + }, + () => { + const body = new ReadableStream({ + start(controller) { + timer = setInterval(() => { + const message = `It is ${new Date().toISOString()}\n`; + controller.enqueue(new TextEncoder().encode(message)); + }, 1000); + _controller = controller; + }, + cancel() { + if (timer !== undefined) { + clearInterval(timer); + } + }, + }); + return new Response(body, { + headers: { + "content-type": "text/plain", + "x-content-type-options": "nosniff", + }, + }); + }, + ); + await promise; + const e = await execCode3("/usr/bin/sh", [ + "-c", + `curl --stderr - -N --compressed --no-progress-meter http://localhost:${servePort}`, + ]); + await e.waitStdoutText("It is "); + clearTimeout(timer); + _controller!.close(); + await e.finished(); + ac.abort(); + await server.finished; + }, +); diff --git a/tests/unit/test_util.ts b/tests/unit/test_util.ts index ba9bf1839a6fdc..db2585ebd10242 100644 --- a/tests/unit/test_util.ts +++ b/tests/unit/test_util.ts @@ -35,14 +35,9 @@ export function execCode(code: string): Promise { return execCode2(code).finished(); } -export function execCode2(code: string) { - const command = new Deno.Command(Deno.execPath(), { - args: [ - "eval", - "--unstable", - "--no-check", - code, - ], +export function execCode3(cmd: string, args: string[]) { + const command = new Deno.Command(cmd, { + args, stdout: "piped", stderr: "inherit", }); @@ -82,6 +77,10 @@ export function execCode2(code: string) { }; } +export function execCode2(code: string) { + return execCode3(Deno.execPath(), ["eval", "--unstable", "--no-check", code]); +} + export function tmpUnixSocketPath(): string { const folder = Deno.makeTempDirSync(); return join(folder, "socket");