Skip to content

Commit

Permalink
Merge pull request from GHSA-9f24-jqhm-jfcw
Browse files Browse the repository at this point in the history
* fetch: pull don't push

Signed-off-by: Matteo Collina <[email protected]>

* added tests

Signed-off-by: Matteo Collina <[email protected]>

---------

Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina committed Feb 5, 2024
1 parent b9da3e4 commit 87a4811
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
9 changes: 6 additions & 3 deletions lib/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1099,10 +1099,10 @@ function fetchFinale (fetchParams, response) {

const byteStream = new ReadableStream({
readableStream: transformStream.readable,
async start (controller) {
async pull (controller) {
const reader = this.readableStream.getReader()

while (true) {
while (controller.desiredSize >= 0) {

This comment has been minimized.

Copy link
@KhafraDev

KhafraDev Feb 8, 2024

Member

@mcollina I'm pretty sure this is the only thing needed for the fix, unless node implements its own options (which aren't documented)? queuingStrategy is the second param passed to streams and ReadableStream doesn't have a highWatermark option. So instead it should look like:

const readableStream = new ReadableStream({
 pull () {},
 // ...
}, new ByteLengthQueuingStrategy({ highWaterMark: 16384 }))

This comment has been minimized.

Copy link
@KhafraDev

KhafraDev Feb 8, 2024

Member

I didn't realize this originally, I realized while working on something else that this didn't look right. Maybe it is right, idk

const { done, value } = await reader.read()

if (done) {
Expand All @@ -1113,6 +1113,7 @@ function fetchFinale (fetchParams, response) {
controller.enqueue(value)
}
},
queuingStrategy: new ByteLengthQueuingStrategy({ highWaterMark: 16384 }),
type: 'bytes'
})

Expand Down Expand Up @@ -1927,6 +1928,7 @@ async function httpNetworkFetch (
// cancelAlgorithm set to cancelAlgorithm.
const stream = new ReadableStream(
{
highWaterMark: 16384,
async start (controller) {
fetchParams.controller.controller = controller
},
Expand All @@ -1936,7 +1938,8 @@ async function httpNetworkFetch (
async cancel (reason) {
await cancelAlgorithm(reason)
},
type: 'bytes'
type: 'bytes',
queuingStrategy: new ByteLengthQueuingStrategy({ highWaterMark: 16384 })
}
)

Expand Down
53 changes: 53 additions & 0 deletions test/fetch/pull-dont-push.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'

const { test } = require('node:test')
const assert = require('node:assert')
const { fetch } = require('../..')
const { createServer } = require('http')
const { once } = require('events')
const { Readable, pipeline } = require('stream')
const { setTimeout: sleep } = require('timers/promises')

const { closeServerAsPromise } = require('../utils/node-http')

test('Allow the usage of custom implementation of AbortController', async (t) => {
let count = 0
let socket
const server = createServer((req, res) => {
res.statusCode = 200
socket = res.socket

// infinite stream
const stream = new Readable({
read () {
this.push('a')
if (count++ > 1000000) {
this.push(null)
}
}
})

pipeline(stream, res, () => {})
})

t.after(closeServerAsPromise(server))

server.listen(0)
await once(server, 'listening')

t.diagnostic('server listening on port %d', server.address().port)
const res = await fetch(`http:https://localhost:${server.address().port}`)
t.diagnostic('fetched')

// Some time is needed to fill the buffer
await sleep(1000)

assert.strictEqual(socket.bytesWritten < 1024 * 1024, true) // 1 MB
socket.destroy()

// consume the stream
try {
/* eslint-disable-next-line no-empty, no-unused-vars */
for await (const chunk of res.body) {}
} catch {}
})

0 comments on commit 87a4811

Please sign in to comment.