Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove tokio_util::block_on from ops/workers.rs #3381

Merged
merged 1 commit into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/compilers/ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ impl TsCompiler {

let worker = TsCompiler::setup_worker(global_state.clone());
let worker_ = worker.clone();
worker.post_message(req_msg).unwrap();
let first_msg_fut = async move {
worker.post_message(req_msg).await.unwrap();
let result = worker.await;
if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting.
Expand Down Expand Up @@ -382,8 +382,8 @@ impl TsCompiler {
.add("Compile", &module_url.to_string());
let global_state_ = global_state.clone();

worker.post_message(req_msg).unwrap();
let first_msg_fut = async move {
worker.post_message(req_msg).await.unwrap();
let result = worker.await;
if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting.
Expand Down
13 changes: 7 additions & 6 deletions cli/compilers/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ impl WasmCompiler {
let worker_ = worker.clone();
let url = source_file.url.clone();

let _res = worker.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
);
let fut = worker
.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
)
.then(|_| worker)
.then(move |result| {
if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting.
Expand Down
35 changes: 22 additions & 13 deletions cli/ops/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::tokio_util;
use crate::worker::Worker;
use deno::*;
use futures;
Expand All @@ -20,6 +19,7 @@ use std::convert::From;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::task::Context;
use std::task::Poll;

Expand Down Expand Up @@ -153,23 +153,31 @@ fn op_create_worker(
js_check(worker.execute(&deno_main_call));
js_check(worker.execute("workerMain()"));

let exec_cb = move |worker: Worker| {
let worker_id = parent_state.add_child_worker(worker);
json!(worker_id)
};
let worker_id = parent_state.add_child_worker(worker.clone());
let response = json!(worker_id);

// Has provided source code, execute immediately.
if has_source_code {
js_check(worker.execute(&source_code));
return Ok(JsonOp::Sync(exec_cb(worker)));
return Ok(JsonOp::Sync(response));
}

let op = worker
// TODO(bartlomieju): this should spawn mod execution on separate tokio task
// and block on receving message on a channel or even use sync channel /shrug
let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1);
let fut = worker
.execute_mod_async(&module_specifier, None, false)
.and_then(move |()| futures::future::ok(exec_cb(worker)));

let result = tokio_util::block_on(op.boxed())?;
Ok(JsonOp::Sync(result))
.then(move |result| {
sender.send(result).expect("Failed to send message");
futures::future::ok(())
})
.boxed()
.compat();
tokio::spawn(fut);

let result = receiver.recv().expect("Failed to receive message");
result?;
Ok(JsonOp::Sync(response))
}

struct GetWorkerClosedFuture {
Expand Down Expand Up @@ -271,9 +279,10 @@ fn op_host_post_message(
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
worker
let fut = worker
.post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));
futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(json!({})))
}

Expand Down
39 changes: 19 additions & 20 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,17 @@ impl Worker {
/// Post message to worker as a host.
///
/// This method blocks current thread.
pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> {
pub fn post_message(
self: &Self,
buf: Buf,
) -> impl Future<Output = Result<(), ErrBox>> {
let channels = self.external_channels.lock().unwrap();
let mut sender = channels.sender.clone();
futures::executor::block_on(sender.send(buf))
.map(|_| ())
.map_err(ErrBox::from)
async move {
let result = sender.send(buf).map_err(ErrBox::from).await;
drop(sender);
result
}
}

/// Get message from worker as a host.
Expand Down Expand Up @@ -199,6 +204,7 @@ mod tests {
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::tokio_util;
use futures::executor::block_on;
use std::sync::atomic::Ordering;

#[test]
Expand Down Expand Up @@ -391,11 +397,10 @@ mod tests {

let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();

let r = worker_.post_message(msg);
let r = block_on(worker_.post_message(msg));
assert!(r.is_ok());

let maybe_msg =
futures::executor::block_on(worker_.get_message()).unwrap();
let maybe_msg = block_on(worker_.get_message()).unwrap();
assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
Expand All @@ -404,7 +409,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
let r = worker_.post_message(msg);
let r = block_on(worker_.post_message(msg));
assert!(r.is_ok());
})
}
Expand Down Expand Up @@ -434,10 +439,10 @@ mod tests {
);

let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = worker_.post_message(msg);
let r = block_on(worker_.post_message(msg));
assert!(r.is_ok());

futures::executor::block_on(worker_future).unwrap();
block_on(worker_future).unwrap();
})
}

Expand All @@ -448,11 +453,8 @@ mod tests {
let mut worker = create_test_worker();
let module_specifier =
ModuleSpecifier::resolve_url_or_path("does-not-exist").unwrap();
let result = futures::executor::block_on(worker.execute_mod_async(
&module_specifier,
None,
false,
));
let result =
block_on(worker.execute_mod_async(&module_specifier, None, false));
assert!(result.is_err());
})
}
Expand All @@ -470,11 +472,8 @@ mod tests {
.to_owned();
let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let result = futures::executor::block_on(worker.execute_mod_async(
&module_specifier,
None,
false,
));
let result =
block_on(worker.execute_mod_async(&module_specifier, None, false));
assert!(result.is_ok());
})
}
Expand Down