Skip to content

Commit

Permalink
Add waker to StreamResource to fix hang on close bugs (denoland#4293)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsouto18 committed Mar 11, 2020
1 parent 810e4a1 commit fb5c314
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 78 deletions.
43 changes: 43 additions & 0 deletions cli/js/tests/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,46 @@ unitTest(
conn.close();
}
);

unitTest(
{
perms: { net: true }
},
async function netHangsOnClose() {
let acceptedConn: Deno.Conn;
const resolvable = createResolvable();

async function iteratorReq(listener: Deno.Listener): Promise<void> {
const p = new Uint8Array(10);
const conn = await listener.accept();
acceptedConn = conn;

try {
while (true) {
const nread = await conn.read(p);
if (nread === Deno.EOF) {
break;
}
await conn.write(new Uint8Array([1, 2, 3]));
}
} catch (err) {
assert(!!err);
assert(err instanceof Deno.errors.BadResource);
}

resolvable.resolve();
}

const addr = { hostname: "127.0.0.1", port: 4500 };
const listener = Deno.listen(addr);
iteratorReq(listener);
const conn = await Deno.connect(addr);
await conn.write(new Uint8Array([1, 2, 3, 4]));
const buf = new Uint8Array(10);
await conn.read(buf);
conn!.close();
acceptedConn!.close();
listener.close();
await resolvable;
}
);
6 changes: 4 additions & 2 deletions cli/ops/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::http_util::{create_http_client, HttpBody};
use crate::op_error::OpError;
use crate::state::State;
Expand Down Expand Up @@ -80,7 +80,9 @@ pub fn op_fetch(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))),
Box::new(StreamResourceHolder::new(StreamResource::HttpBody(
Box::new(body),
))),
);

let json_res = json!({
Expand Down
13 changes: 8 additions & 5 deletions cli/ops/fs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value};
use super::io::{FileMetadata, StreamResource};
use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
use crate::fs as deno_fs;
use crate::op_error::OpError;
use crate::ops::dispatch_json::JsonResult;
Expand Down Expand Up @@ -153,7 +153,10 @@ fn op_open(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"fsFile",
Box::new(StreamResource::FsFile(fs_file, FileMetadata::default())),
Box::new(StreamResourceHolder::new(StreamResource::FsFile(
fs_file,
FileMetadata::default(),
))),
);
Ok(json!(rid))
};
Expand Down Expand Up @@ -198,12 +201,12 @@ fn op_seek(
};

let state = state.borrow();
let resource = state
let resource_holder = state
.resource_table
.get::<StreamResource>(rid)
.get::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;

let tokio_file = match resource {
let tokio_file = match resource_holder.resource {
StreamResource::FsFile(ref file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
Expand Down
100 changes: 86 additions & 14 deletions cli/ops/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use deno_core::*;
use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::ready;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Context;
use std::task::Poll;
use tokio::io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -56,15 +58,23 @@ pub fn init(i: &mut Isolate, s: &State) {
);
}

pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
let stdin = StreamResource::Stdin(tokio::io::stdin(), TTYMetadata::default());
let stdout = StreamResource::Stdout({
pub fn get_stdio() -> (
StreamResourceHolder,
StreamResourceHolder,
StreamResourceHolder,
) {
let stdin = StreamResourceHolder::new(StreamResource::Stdin(
tokio::io::stdin(),
TTYMetadata::default(),
));
let stdout = StreamResourceHolder::new(StreamResource::Stdout({
let stdout = STDOUT_HANDLE
.try_clone()
.expect("Unable to clone stdout handle");
tokio::fs::File::from_std(stdout)
});
let stderr = StreamResource::Stderr(tokio::io::stderr());
}));
let stderr =
StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr()));

(stdin, stdout, stderr)
}
Expand All @@ -87,6 +97,51 @@ pub struct FileMetadata {
pub tty: TTYMetadata,
}

pub struct StreamResourceHolder {
pub resource: StreamResource,
waker: HashMap<usize, futures::task::AtomicWaker>,
waker_counter: AtomicUsize,
}

impl StreamResourceHolder {
pub fn new(resource: StreamResource) -> StreamResourceHolder {
StreamResourceHolder {
resource,
// Atleast one task is expecter for the resource
waker: HashMap::with_capacity(1),
// Tracks wakers Ids
waker_counter: AtomicUsize::new(0),
}
}
}

impl Drop for StreamResourceHolder {
fn drop(&mut self) {
self.wake_tasks();
}
}

impl StreamResourceHolder {
pub fn track_task(&mut self, cx: &Context) -> Result<usize, OpError> {
let waker = futures::task::AtomicWaker::new();
waker.register(cx.waker());
// Its OK if it overflows
let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
self.waker.insert(task_waker_id, waker);
Ok(task_waker_id)
}

pub fn wake_tasks(&mut self) {
for waker in self.waker.values() {
waker.wake();
}
}

pub fn untrack_task(&mut self, task_waker_id: usize) {
self.waker.remove(&task_waker_id);
}
}

pub enum StreamResource {
Stdin(tokio::io::Stdin, TTYMetadata),
Stdout(tokio::fs::File),
Expand Down Expand Up @@ -150,10 +205,27 @@ pub fn op_read(

poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?;

let mut task_tracker_id: Option<usize> = None;
let nread = match resource_holder
.resource
.poll_read(cx, &mut buf.as_mut()[..])
.map_err(OpError::from)
{
Poll::Ready(t) => {
if let Some(id) = task_tracker_id {
resource_holder.untrack_task(id);
}
t
}
Poll::Pending => {
task_tracker_id.replace(resource_holder.track_task(cx)?);
return Poll::Pending;
}
}?;
Poll::Ready(Ok(nread as i32))
})
.boxed_local()
Expand Down Expand Up @@ -233,10 +305,10 @@ pub fn op_write(
async move {
let nwritten = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
resource.poll_write(cx, &buf.as_ref()[..])
resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
})
.await?;

Expand All @@ -246,10 +318,10 @@ pub fn op_write(
// https://github.com/denoland/deno/issues/3565
poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
let resource_holder = resource_table
.get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
resource.poll_flush(cx)
resource_holder.resource.poll_flush(cx)
})
.await?;

Expand Down
26 changes: 16 additions & 10 deletions cli/ops/net.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
Expand Down Expand Up @@ -78,9 +78,12 @@ fn op_accept(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state
.resource_table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
tcp_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
Expand Down Expand Up @@ -207,9 +210,12 @@ fn op_connect(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
let rid = state
.resource_table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
let rid = state.resource_table.add(
"tcpStream",
Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
tcp_stream,
))),
);
Ok(json!({
"rid": rid,
"localAddr": {
Expand Down Expand Up @@ -251,11 +257,11 @@ fn op_shutdown(
};

let mut state = state.borrow_mut();
let resource = state
let resource_holder = state
.resource_table
.get_mut::<StreamResource>(rid)
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
match resource {
match resource_holder.resource {
StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?;
}
Expand Down
20 changes: 13 additions & 7 deletions cli/ops/process.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::signal::kill;
use crate::state::State;
Expand All @@ -24,11 +24,11 @@ pub fn init(i: &mut Isolate, s: &State) {

fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, OpError> {
let mut state = state.borrow_mut();
let repr = state
let repr_holder = state
.resource_table
.get_mut::<StreamResource>(rid)
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
let file = match repr {
let file = match repr_holder.resource {
StreamResource::FsFile(ref mut file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
Expand Down Expand Up @@ -127,7 +127,9 @@ fn op_run(
Some(child_stdin) => {
let rid = table.add(
"childStdin",
Box::new(StreamResource::ChildStdin(child_stdin)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
child_stdin,
))),
);
Some(rid)
}
Expand All @@ -138,7 +140,9 @@ fn op_run(
Some(child_stdout) => {
let rid = table.add(
"childStdout",
Box::new(StreamResource::ChildStdout(child_stdout)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
child_stdout,
))),
);
Some(rid)
}
Expand All @@ -149,7 +153,9 @@ fn op_run(
Some(child_stderr) => {
let rid = table.add(
"childStderr",
Box::new(StreamResource::ChildStderr(child_stderr)),
Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
child_stderr,
))),
);
Some(rid)
}
Expand Down
10 changes: 7 additions & 3 deletions cli/ops/tls.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
Expand Down Expand Up @@ -85,7 +85,9 @@ pub fn op_connect_tls(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"clientTlsStream",
Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
Box::new(tls_stream),
))),
);
Ok(json!({
"rid": rid,
Expand Down Expand Up @@ -318,7 +320,9 @@ fn op_accept_tls(
let mut state = state.borrow_mut();
state.resource_table.add(
"serverTlsStream",
Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
Box::new(tls_stream),
))),
)
};
Ok(json!({
Expand Down
Loading

0 comments on commit fb5c314

Please sign in to comment.