Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web workers #1993

Merged
merged 27 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6481082
Web workers pass 1
afinch7 Mar 23, 2019
368bbfd
Make linter happy
afinch7 Mar 23, 2019
d28a44d
fmt
afinch7 Mar 23, 2019
84e56d8
Refactored the way worker polling is scheduled and errors are handled.
afinch7 Mar 25, 2019
df12a00
Some cleanup and handle worker close on host end.
afinch7 Mar 26, 2019
99f6db1
Merge branch 'master' into web_workers
afinch7 Mar 26, 2019
a1b048c
Share the worker future as a Shared
afinch7 Mar 26, 2019
4f52d6b
Merge branch 'web_workers' of https://github.com/afinch7/deno into we…
afinch7 Mar 26, 2019
8d30513
Fixed some merge issues and added a worker test
afinch7 Mar 26, 2019
ec2149b
compile_sync returns errors now
afinch7 Mar 27, 2019
2103d32
Refactored compile_sync again
afinch7 Mar 27, 2019
1a6be92
fixed bugs and moved compiler error exit back to compile_sync
afinch7 Mar 27, 2019
4eb80ab
lots of comments for compile_sync
afinch7 Mar 28, 2019
1e2c184
refactored compile_sync again to use a future that is less blocking.
afinch7 Mar 28, 2019
cfd3289
Merge branch 'master' into web_workers
afinch7 Mar 28, 2019
e245db1
enable debug to find this problem
afinch7 Mar 28, 2019
58de310
remove old comments
afinch7 Mar 28, 2019
e6a3bab
maybe same tokio runtime for all compiler tasks?
afinch7 Mar 28, 2019
fa72bc9
remove debug from tests
afinch7 Mar 28, 2019
32d7ad4
worker ts lib
afinch7 Mar 30, 2019
a9c8271
requested change: workers test assert
afinch7 Mar 31, 2019
5b101a8
renamed WebWorkerBehavior to UserWorkerBehaivor and moved to workers.rs
afinch7 Mar 31, 2019
8388fa2
Merge remote-tracking branch 'upstream/master' into web_workers
afinch7 Mar 31, 2019
53fcd75
fixed remaining merge issues
afinch7 Mar 31, 2019
6e173b8
remove thread spawn from worker tests
afinch7 Mar 31, 2019
5815927
removed worker specific snapshot/bundle
afinch7 Apr 1, 2019
b0881d4
forgot to remove the startup data function
afinch7 Apr 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 242 additions & 76 deletions cli/compiler.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,64 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use core::ops::Deref;
use crate::flags::DenoFlags;
use crate::isolate_state::*;
use crate::js_errors::JSErrorColor;
use crate::msg;
use crate::ops;
use crate::resources;
use crate::resources::Resource;
use crate::resources::ResourceId;
use crate::startup_data;
use crate::workers;
use crate::workers::WorkerBehavior;
use crate::workers::WorkerInit;
use deno::deno_buf;
use deno::Behavior;
use deno::Buf;
use deno::JSError;
use deno::Op;
use deno::StartupData;
use futures::future::*;
use futures::sync::oneshot;
use futures::Future;
use serde_json;
use std::str;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::runtime::Runtime;

/// Used for normalization of types on internal future completions
type CompilerInnerResult = Result<ModuleMetaData, Option<JSError>>;
type WorkerErrReceiver = oneshot::Receiver<CompilerInnerResult>;

/// Shared resources for used to complete compiler operations.
/// rid is the resource id for compiler worker resource used for sending it
/// compile requests
/// worker_err_receiver is a shared future that will compelete when the
/// compiler worker future completes, and send back an error if present
/// or a None if not
#[derive(Clone)]
struct CompilerShared {
afinch7 marked this conversation as resolved.
Show resolved Hide resolved
pub rid: ResourceId,
pub worker_err_receiver: Shared<WorkerErrReceiver>,
}

lazy_static! {
static ref C_RID: Mutex<Option<ResourceId>> = Mutex::new(None);
// Shared worker resources so we can spawn
static ref C_SHARED: Mutex<Option<CompilerShared>> = Mutex::new(None);
// tokio runtime specifically for spawning logic that is dependent on
// completetion of the compiler worker future
static ref C_RUNTIME: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap());
Copy link
Member

@ry ry Mar 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This runtime is different than the tokio runtime? (defined by crate::tokio_util::run)
Ideally we could get all isolates using the same runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, but this leads to tokio waiting indefinitely for the compiler worker to exit. The compiler worker only has two ways to exit right now: calling workerClose as the worker or throwing a uncaught error. Maybe work on a better way to terminate the compiler worker later?

}

pub struct CompilerBehavior {
pub state: Arc<IsolateState>,
}

impl CompilerBehavior {
pub fn new(state: Arc<IsolateState>) -> Self {
Self { state }
pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
Self {
state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
}
}
}

Expand Down Expand Up @@ -65,13 +94,14 @@ impl WorkerBehavior for CompilerBehavior {
self.state.flags.clone(),
self.state.argv.clone(),
Some(worker_channels),
true,
));
}
}

// This corresponds to JS ModuleMetaData.
// TODO Rename one or the other so they correspond.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ModuleMetaData {
pub module_name: String,
pub filename: String,
Expand Down Expand Up @@ -102,26 +132,60 @@ impl ModuleMetaData {
}
}

fn lazy_start(parent_state: Arc<IsolateState>) -> Resource {
let mut cell = C_RID.lock().unwrap();
let rid = cell.get_or_insert_with(|| {
let resource = workers::spawn(
CompilerBehavior::new(Arc::new(IsolateState::new(
parent_state.flags.clone(),
parent_state.argv.clone(),
None,
))),
"compilerMain()".to_string(),
);
resource.rid
});
Resource { rid: *rid }
fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
let mut cell = C_SHARED.lock().unwrap();
cell
.get_or_insert_with(|| {
let worker_result = workers::spawn(
CompilerBehavior::new(
parent_state.flags.clone(),
parent_state.argv.clone(),
),
"TS",
WorkerInit::Script("compilerMain()".to_string()),
);
match worker_result {
Ok(worker) => {
let rid = worker.resource.rid.clone();
// create oneshot channels and use the sender to pass back
// results from worker future
let (err_sender, err_receiver) =
oneshot::channel::<CompilerInnerResult>();
let mut runtime = C_RUNTIME.lock().unwrap();
runtime.spawn(lazy(move || {
let resource = worker.resource.clone();
worker.then(move |result| -> Result<(), ()> {
resource.close();
match result {
Err(err) => err_sender.send(Err(Some(err))).unwrap(),
_ => err_sender.send(Err(None)).unwrap(),
};
Ok(())
})
}));
CompilerShared {
rid,
worker_err_receiver: err_receiver.shared(),
}
}
Err(err) => {
println!("{}", err.to_string());
std::process::exit(1);
}
}
}).clone()
}

fn req(specifier: &str, referrer: &str) -> Buf {
fn show_compiler_error(err: JSError) -> ModuleMetaData {
eprintln!("{}", JSErrorColor(&err).to_string());
std::process::exit(1);
}

fn req(specifier: &str, referrer: &str, is_worker_main: bool) -> Buf {
json!({
"specifier": specifier,
"referrer": referrer,
"isWorker": is_worker_main
}).to_string()
.into_boxed_str()
.into_boxed_bytes()
Expand All @@ -133,70 +197,172 @@ pub fn compile_sync(
referrer: &str,
module_meta_data: &ModuleMetaData,
) -> ModuleMetaData {
let req_msg = req(specifier, referrer);

let compiler = lazy_start(parent_state);

let send_future = resources::worker_post_message(compiler.rid, req_msg);
send_future.wait().unwrap();

let recv_future = resources::worker_recv_message(compiler.rid);
let result = recv_future.wait().unwrap();
assert!(result.is_some());
let res_msg = result.unwrap();

let res_json = std::str::from_utf8(&res_msg).unwrap();
match serde_json::from_str::<serde_json::Value>(res_json) {
Ok(serde_json::Value::Object(map)) => ModuleMetaData {
module_name: module_meta_data.module_name.clone(),
filename: module_meta_data.filename.clone(),
media_type: module_meta_data.media_type,
source_code: module_meta_data.source_code.clone(),
maybe_output_code: match map["outputCode"].as_str() {
Some(str) => Some(str.as_bytes().to_owned()),
_ => None,
},
maybe_output_code_filename: None,
maybe_source_map: match map["sourceMap"].as_str() {
Some(str) => Some(str.as_bytes().to_owned()),
_ => None,
},
maybe_source_map_filename: None,
},
_ => panic!("error decoding compiler response"),
let is_worker = parent_state.is_worker.clone();
let shared = lazy_start(parent_state);

let (local_sender, local_receiver) =
oneshot::channel::<Result<ModuleMetaData, Option<JSError>>>();

// Just some extra scoping to keep things clean
{
let compiler_rid = shared.rid.clone();
let module_meta_data_ = module_meta_data.clone();
let req_msg = req(specifier, referrer, is_worker);
let sender_arc = Arc::new(Some(local_sender));
let specifier_ = specifier.clone().to_string();
let referrer_ = referrer.clone().to_string();

let mut runtime = C_RUNTIME.lock().unwrap();
runtime.spawn(lazy(move || {
debug!(
"Running rust part of compile_sync specifier: {} referrer: {}",
specifier_, referrer_
);
let mut send_sender_arc = sender_arc.clone();
resources::post_message_to_worker(compiler_rid, req_msg)
.map_err(move |_| {
let sender = Arc::get_mut(&mut send_sender_arc).unwrap().take();
sender.unwrap().send(Err(None)).unwrap()
}).and_then(move |_| {
debug!(
"Sent message to worker specifier: {} referrer: {}",
specifier_, referrer_
);
let mut get_sender_arc = sender_arc.clone();
let mut result_sender_arc = sender_arc.clone();
resources::get_message_from_worker(compiler_rid)
.map_err(move |_| {
let sender = Arc::get_mut(&mut get_sender_arc).unwrap().take();
sender.unwrap().send(Err(None)).unwrap()
}).and_then(move |res_msg_option| -> Result<(), ()> {
debug!(
"Recieved message from worker specifier: {} referrer: {}",
specifier_, referrer_
);
let res_msg = res_msg_option.unwrap();
let res_json = std::str::from_utf8(&res_msg).unwrap();
let sender = Arc::get_mut(&mut result_sender_arc).unwrap().take();
let sender = sender.unwrap();
Ok(
sender
.send(Ok(match serde_json::from_str::<serde_json::Value>(
res_json,
) {
Ok(serde_json::Value::Object(map)) => ModuleMetaData {
module_name: module_meta_data_.module_name.clone(),
filename: module_meta_data_.filename.clone(),
media_type: module_meta_data_.media_type,
source_code: module_meta_data_.source_code.clone(),
maybe_output_code: match map["outputCode"].as_str() {
Some(str) => Some(str.as_bytes().to_owned()),
_ => None,
},
maybe_output_code_filename: None,
maybe_source_map: match map["sourceMap"].as_str() {
Some(str) => Some(str.as_bytes().to_owned()),
_ => None,
},
maybe_source_map_filename: None,
},
_ => panic!("error decoding compiler response"),
})).unwrap(),
)
})
})
}));
}

let worker_receiver = shared.worker_err_receiver.clone();

let union =
futures::future::select_all(vec![worker_receiver, local_receiver.shared()]);

match union.wait() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to refactor this into a compile_async and compile_sync which calls the async version? I'd like to experiment with parallel compilation at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in another PR? This would require a major rework of communication with the compiler worker. There is no way to relate responses directly to requests currently, and also no way to relate errors back to requests or even handle those errors without the compiler exiting(permanently).

Ok((result, i, rest)) => {
// We got a sucessful finish before any recivers where canceled
let mut rest_mut = rest;
match ((*result.deref()).clone(), i) {
// Either receiver was completed with success.
(Ok(v), _) => v,
// Either receiver was completed with a valid error
// this should be fatal for now since it is not intended
// to be possible to recover from a uncaught error in a isolate
(Err(Some(err)), _) => show_compiler_error(err),
// local_receiver finished first with a none error. This is intended
// to catch when the local logic can't complete because it is unable
// to send and/or receive messages from the compiler worker.
// Due to the way that scheduling works it is very likely that the
// compiler worker future has already or will in the near future
// complete with a valid JSError or a None.
(Err(None), 1) => {
debug!("Compiler local exited with None error!");
// While technically possible to get stuck here indefinately
// in theory it is highly unlikely.
debug!(
"Waiting on compiler worker result specifier: {} referrer: {}!",
specifier, referrer
);
let worker_result =
(*rest_mut.remove(0).wait().unwrap().deref()).clone();
debug!(
"Finished waiting on worker result specifier: {} referrer: {}!",
specifier, referrer
);
match worker_result {
Err(Some(err)) => show_compiler_error(err),
Err(None) => panic!("Compiler exit for an unknown reason!"),
Ok(v) => v,
}
}
// While possible beccause the compiler worker can exit without error
// this shouldn't occurr normally and I don't intend to attempt to
// handle it right now
(_, i) => panic!("Odd compiler result for future {}!", i),
}
}
// This should always a result of a reciver being cancled
// in theory but why not give a print out just in case
Err((err, i, _)) => panic!("compile_sync {} failed: {}", i, err),
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::tokio_util;

#[test]
fn test_compile_sync() {
let cwd = std::env::current_dir().unwrap();
let cwd_string = cwd.to_str().unwrap().to_owned();

let specifier = "./tests/002_hello.ts";
let referrer = cwd_string + "/";

let mut out = ModuleMetaData {
module_name: "xxx".to_owned(),
filename: "/tests/002_hello.ts".to_owned(),
media_type: msg::MediaType::TypeScript,
source_code: "console.log(\"Hello World\");".as_bytes().to_owned(),
maybe_output_code_filename: None,
maybe_output_code: None,
maybe_source_map_filename: None,
maybe_source_map: None,
};

out =
compile_sync(Arc::new(IsolateState::mock()), specifier, &referrer, &out);
assert!(
out
.maybe_output_code
.unwrap()
.starts_with("console.log(\"Hello World\");".as_bytes())
);
tokio_util::init(|| {
let cwd = std::env::current_dir().unwrap();
let cwd_string = cwd.to_str().unwrap().to_owned();

let specifier = "./tests/002_hello.ts";
let referrer = cwd_string + "/";

let mut out = ModuleMetaData {
module_name: "xxx".to_owned(),
filename: "/tests/002_hello.ts".to_owned(),
media_type: msg::MediaType::TypeScript,
source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
maybe_output_code_filename: None,
maybe_output_code: None,
maybe_source_map_filename: None,
maybe_source_map: None,
};

out = compile_sync(
Arc::new(IsolateState::mock()),
specifier,
&referrer,
&out,
);
assert!(
out
.maybe_output_code
.unwrap()
.starts_with("console.log(\"Hello World\");".as_bytes())
);
});
}
}
Loading