Skip to content

Commit

Permalink
remove mutex from resource table
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju committed Feb 8, 2020
1 parent 9f7ef14 commit ee36789
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 43 deletions.
3 changes: 2 additions & 1 deletion cli/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
#![deny(warnings)]

#[macro_use]
extern crate lazy_static;
Expand Down Expand Up @@ -112,7 +113,7 @@ fn create_main_worker(

let state_ = state.clone();
{
let mut resource_table = state_.lock_resource_table();
let mut resource_table = state_.resource_table.borrow_mut();
let (stdin, stdout, stderr) = get_stdio();
resource_table.add("stdin", Box::new(stdin));
resource_table.add("stdout", Box::new(stdout));
Expand Down
2 changes: 1 addition & 1 deletion cli/ops/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub fn op_fetch(
}

let body = HttpBody::from(res);
let mut table = state_.lock_resource_table();
let mut table = state_.resource_table.borrow_mut();
let rid = table.add(
"httpBody",
Box::new(StreamResource::HttpBody(Box::new(body))),
Expand Down
12 changes: 5 additions & 7 deletions cli/ops/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ fn op_open(

let fut = async move {
let fs_file = open_options.open(filename).await?;
let mut table = state_.lock_resource_table();
let mut table = state_.resource_table.borrow_mut();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
Ok(json!(rid))
};
Expand All @@ -155,7 +155,7 @@ fn op_close(
) -> Result<JsonOp, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?;

let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
table.close(args.rid as u32).ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({})))
}
Expand Down Expand Up @@ -191,13 +191,11 @@ fn op_seek(
}
};

let mut table = state.lock_resource_table();
let resource = table
.get_mut::<StreamResource>(rid)
.ok_or_else(bad_resource)?;
let table = state.resource_table.borrow();
let resource = table.get::<StreamResource>(rid).ok_or_else(bad_resource)?;

let tokio_file = match resource {
StreamResource::FsFile(ref mut file) => file,
StreamResource::FsFile(ref file) => file,
_ => return Err(bad_resource()),
};
let mut file = futures::executor::block_on(tokio_file.try_clone())?;
Expand Down
7 changes: 4 additions & 3 deletions cli/ops/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ where
panic!("poll a Read after it's done");
}

let mut table = inner.state.lock_resource_table();
// TODO(bartlomieju): rewrite as poll_fn(|| {})
let mut table = inner.state.resource_table.borrow_mut();
let resource = table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
Expand Down Expand Up @@ -294,7 +295,7 @@ where
}

if inner.io_state == IoState::Pending {
let mut table = inner.state.lock_resource_table();
let mut table = inner.state.resource_table.borrow_mut();
let resource = table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
Expand All @@ -309,7 +310,7 @@ where
// Figure out why it's needed and preferably remove it.
// https://github.com/denoland/deno/issues/3565
if inner.io_state == IoState::Flush {
let mut table = inner.state.lock_resource_table();
let mut table = inner.state.resource_table.borrow_mut();
let resource = table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
Expand Down
12 changes: 6 additions & 6 deletions cli/ops/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Future for Accept<'_> {
panic!("poll Accept after it's done");
}

let mut table = inner.state.lock_resource_table();
let mut table = inner.state.resource_table.borrow_mut();
let listener_resource = table
.get_mut::<TcpListenerResource>(inner.rid)
.ok_or_else(|| {
Expand Down Expand Up @@ -102,7 +102,7 @@ fn op_accept(
let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state_ = state.clone();
let table = state.lock_resource_table();
let table = state.resource_table.borrow();
table
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
Expand All @@ -111,7 +111,7 @@ fn op_accept(
let (tcp_stream, _socket_addr) = accept(&state_, rid).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut table = state_.lock_resource_table();
let mut table = state_.resource_table.borrow_mut();
let rid =
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok(json!({
Expand Down Expand Up @@ -154,7 +154,7 @@ fn op_connect(
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut table = state_.lock_resource_table();
let mut table = state_.resource_table.borrow_mut();
let rid =
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok(json!({
Expand Down Expand Up @@ -197,7 +197,7 @@ fn op_shutdown(
_ => unimplemented!(),
};

let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
let resource = table
.get_mut::<StreamResource>(rid)
.ok_or_else(bad_resource)?;
Expand Down Expand Up @@ -290,7 +290,7 @@ fn op_listen(
waker: None,
local_addr,
};
let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
let rid = table.add("tcpListener", Box::new(listener_resource));
debug!(
"New listener {} {}:{}",
Expand Down
2 changes: 1 addition & 1 deletion cli/ops/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn op_open_plugin(
lib,
ops: HashMap::new(),
};
let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
let rid = table.add("plugin", Box::new(plugin_resource));
let plugin_resource = table.get_mut::<PluginResource>(rid).unwrap();

Expand Down
6 changes: 3 additions & 3 deletions cli/ops/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn init(i: &mut Isolate, s: &State) {
}

fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, ErrBox> {
let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
let repr = table
.get_mut::<StreamResource>(rid)
.ok_or_else(bad_resource)?;
Expand Down Expand Up @@ -127,7 +127,7 @@ fn op_run(
let mut child = c.spawn()?;
let pid = child.id();

let mut table = state_.lock_resource_table();
let mut table = state_.resource_table.borrow_mut();

let stdin_rid = match child.stdin.take() {
Some(child_stdin) => {
Expand Down Expand Up @@ -184,7 +184,7 @@ impl Future for ChildStatus {

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
let mut table = inner.state.lock_resource_table();
let mut table = inner.state.resource_table.borrow_mut();
let child_resource = table
.get_mut::<ChildResource>(inner.rid)
.ok_or_else(bad_resource)?;
Expand Down
4 changes: 2 additions & 2 deletions cli/ops/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn op_repl_start(
repl::history_path(&state.global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let resource = ReplResource(Arc::new(Mutex::new(repl)));
let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
let rid = table.add("repl", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
}
Expand All @@ -61,7 +61,7 @@ fn op_repl_readline(
let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt);
let state = state.clone();
let table = state.lock_resource_table();
let table = state.resource_table.borrow_mut();
let resource = table.get::<ReplResource>(rid).ok_or_else(bad_resource)?;
let repl = resource.0.clone();

Expand Down
2 changes: 1 addition & 1 deletion cli/ops/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn op_resources(
_args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let resource_table = state.lock_resource_table();
let resource_table = state.resource_table.borrow();
let serialized_resources = resource_table.entries();
Ok(JsonOp::Sync(json!(serialized_resources)))
}
6 changes: 3 additions & 3 deletions cli/ops/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn op_signal_bind(
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let args: BindSignalArgs = serde_json::from_value(args)?;
let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
let rid = table.add(
"signal",
Box::new(SignalStreamResource(
Expand All @@ -80,7 +80,7 @@ fn op_signal_poll(
let state_ = state.clone();

let future = poll_fn(move |cx| {
let mut table = state_.lock_resource_table();
let mut table = state_.resource_table.borrow_mut();
if let Some(mut signal) = table.get_mut::<SignalStreamResource>(rid) {
signal.1 = Some(cx.waker().clone());
return signal.0.poll_recv(cx);
Expand All @@ -100,7 +100,7 @@ pub fn op_signal_unbind(
) -> Result<JsonOp, ErrBox> {
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
let resource = table.get::<SignalStreamResource>(rid);
if let Some(signal) = resource {
if let Some(waker) = &signal.1 {
Expand Down
10 changes: 5 additions & 5 deletions cli/ops/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub fn op_connect_tls(
let dnsname =
DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let mut table = state_.lock_resource_table();
let mut table = state_.resource_table.borrow_mut();
let rid = table.add(
"clientTlsStream",
Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
Expand Down Expand Up @@ -270,7 +270,7 @@ fn op_listen_tls(
waker: None,
local_addr,
};
let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
let rid = table.add("tlsListener", Box::new(tls_listener_resource));

Ok(JsonOp::Sync(json!({
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Future for AcceptTls {
panic!("poll AcceptTls after it's done");
}

let mut table = inner.state.lock_resource_table();
let mut table = inner.state.resource_table.borrow_mut();
let listener_resource = table
.get_mut::<TlsListenerResource>(inner.rid)
.ok_or_else(|| {
Expand Down Expand Up @@ -364,7 +364,7 @@ fn op_accept_tls(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let tls_acceptor = {
let table = state.lock_resource_table();
let table = state.resource_table.borrow();
let resource = table
.get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource)
Expand All @@ -373,7 +373,7 @@ fn op_accept_tls(
};
let tls_stream = tls_acceptor.accept(tcp_stream).await?;
let rid = {
let mut table = state.lock_resource_table();
let mut table = state.resource_table.borrow_mut();
table.add(
"serverTlsStream",
Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
Expand Down
13 changes: 3 additions & 10 deletions cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ use std::rc::Rc;
use std::str;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Instant;

#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
Expand All @@ -53,15 +50,11 @@ pub struct State {
pub next_worker_id: Rc<RefCell<AtomicUsize>>,
pub start_time: Instant,
pub seeded_rng: Option<Rc<RefCell<StdRng>>>,
pub resource_table: Arc<Mutex<ResourceTable>>,
pub resource_table: Rc<RefCell<ResourceTable>>,
pub target_lib: TargetLib,
}

impl State {
pub fn lock_resource_table(&self) -> MutexGuard<ResourceTable> {
self.resource_table.lock().unwrap()
}

/// Wrap core `OpDispatcher` to collect metrics.
pub fn core_op<D>(
&self,
Expand Down Expand Up @@ -234,7 +227,7 @@ impl State {
start_time: Instant::now(),
seeded_rng,

resource_table: Arc::new(Mutex::new(ResourceTable::default())),
resource_table: Rc::new(RefCell::new(ResourceTable::default())),
target_lib: TargetLib::Main,
};

Expand Down Expand Up @@ -271,7 +264,7 @@ impl State {
start_time: Instant::now(),
seeded_rng,

resource_table: Arc::new(Mutex::new(ResourceTable::default())),
resource_table: Rc::new(RefCell::new(ResourceTable::default())),
target_lib: TargetLib::Worker,
};

Expand Down

0 comments on commit ee36789

Please sign in to comment.