Skip to content

Commit

Permalink
fix(ext/node): return cancelled flag in get_response_body_chunk op (d…
Browse files Browse the repository at this point in the history
…enoland#23962)

The flag lets us exit from read loop without throwing an error when
the stream is cancelled.

This fixes gRPC cancellation example.

Co-authored-by: Bartek Iwańczuk <[email protected]>
  • Loading branch information
satyarohith and bartlomieju committed May 23, 2024
1 parent 0a30897 commit e450c6b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
19 changes: 15 additions & 4 deletions ext/node/ops/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use deno_core::ResourceId;
use deno_net::raw::take_network_stream_resource;
use deno_net::raw::NetworkStream;
use h2;
use h2::Reason;
use h2::RecvStream;
use http_v02;
use http_v02::request::Parts;
Expand Down Expand Up @@ -496,17 +497,27 @@ fn poll_data_or_trailers(
pub async fn op_http2_client_get_response_body_chunk(
state: Rc<RefCell<OpState>>,
#[smi] body_rid: ResourceId,
) -> Result<(Option<Vec<u8>>, bool), AnyError> {
) -> Result<(Option<Vec<u8>>, bool, bool), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientResponseBody>(body_rid)?;
let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;

loop {
match poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await? {
let result = poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await;
if let Err(err) = result {
let reason = err.reason();
if let Some(reason) = reason {
if reason == Reason::CANCEL {
return Ok((None, false, true));
}
}
return Err(err.into());
}
match result.unwrap() {
DataOrTrailers::Data(data) => {
return Ok((Some(data.to_vec()), false));
return Ok((Some(data.to_vec()), false, false));
}
DataOrTrailers::Trailers(trailers) => {
if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx)
Expand All @@ -524,7 +535,7 @@ pub async fn op_http2_client_get_response_body_chunk(
.borrow_mut()
.await
.take();
return Ok((None, true));
return Ok((None, true, false));
}
};
}
Expand Down
11 changes: 8 additions & 3 deletions ext/node/polyfills/http2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,14 @@ export class ClientHttp2Stream extends Duplex {
debugHttp2(">>> read");

(async () => {
const [chunk, finished] = await op_http2_client_get_response_body_chunk(
this[kDenoResponse].bodyRid,
);
const [chunk, finished, cancelled] =
await op_http2_client_get_response_body_chunk(
this[kDenoResponse].bodyRid,
);

if (cancelled) {
return;
}

debugHttp2(">>> chunk", chunk, finished, this[kDenoResponse].bodyRid);
if (chunk === null) {
Expand Down

0 comments on commit e450c6b

Please sign in to comment.