Skip to content

Commit

Permalink
refactor: rewrite "net" ops to use serde_v8 (denoland#10028)
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronO committed Apr 6, 2021
1 parent 2c52c0a commit ff5d072
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 175 deletions.
8 changes: 1 addition & 7 deletions runtime/ops/fs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
use super::io::StdFileResource;
use super::utils::into_string;
use crate::fs_util::canonicalize_path;
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
Expand Down Expand Up @@ -108,13 +109,6 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_async(rt, "op_utime_async", op_utime_async);
}

fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
s.into_string().map_err(|s| {
let message = format!("File name or path {:?} is not valid UTF-8", s);
custom_error("InvalidData", message)
})
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OpenArgs {
Expand Down
1 change: 1 addition & 0 deletions runtime/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod timers;
pub mod tls;
pub mod tty;
pub mod url;
mod utils;
pub mod web_worker;
pub mod webgpu;
pub mod websocket;
Expand Down
198 changes: 116 additions & 82 deletions runtime/ops/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::AsyncRefCell;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
Expand Down Expand Up @@ -53,6 +51,39 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_async(rt, "op_dns_resolve", op_dns_resolve);
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct OpConn {
pub rid: ResourceId,
pub remote_addr: Option<OpAddr>,
pub local_addr: Option<OpAddr>,
}

#[derive(Serialize)]
#[serde(tag = "transport", rename_all = "lowercase")]
pub enum OpAddr {
Tcp(IpAddr),
Udp(IpAddr),
#[cfg(unix)]
Unix(net_unix::UnixAddr),
#[cfg(unix)]
UnixPacket(net_unix::UnixAddr),
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
/// A received datagram packet (from udp or unixpacket)
pub struct OpPacket {
pub size: usize,
pub remote_addr: OpAddr,
}

#[derive(Serialize)]
pub struct IpAddr {
pub hostname: String,
pub port: u16,
}

#[derive(Deserialize)]
pub(crate) struct AcceptArgs {
pub rid: ResourceId,
Expand All @@ -63,7 +94,7 @@ async fn accept_tcp(
state: Rc<RefCell<OpState>>,
args: AcceptArgs,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<Value, AnyError> {
) -> Result<OpConn, AnyError> {
let rid = args.rid;

let resource = state
Expand Down Expand Up @@ -91,37 +122,36 @@ async fn accept_tcp(
let rid = state
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": "tcp",
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": "tcp",
}
}))
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
}

async fn op_accept(
state: Rc<RefCell<OpState>>,
args: AcceptArgs,
_buf: Option<ZeroCopyBuf>,
) -> Result<Value, AnyError> {
) -> Result<OpConn, AnyError> {
match args.transport.as_str() {
"tcp" => accept_tcp(state, args, _buf).await,
#[cfg(unix)]
"unix" => net_unix::accept_unix(state, args, _buf).await,
_ => Err(generic_error(format!(
"Unsupported transport protocol {}",
args.transport
))),
other => Err(bad_transport(other)),
}
}

fn bad_transport(transport: &str) -> AnyError {
generic_error(format!("Unsupported transport protocol {}", transport))
}

#[derive(Deserialize)]
pub(crate) struct ReceiveArgs {
pub rid: ResourceId,
Expand All @@ -132,7 +162,7 @@ async fn receive_udp(
state: Rc<RefCell<OpState>>,
args: ReceiveArgs,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<Value, AnyError> {
) -> Result<OpPacket, AnyError> {
let zero_copy = zero_copy.ok_or_else(null_opbuf)?;
let mut zero_copy = zero_copy.clone();

Expand All @@ -149,29 +179,25 @@ async fn receive_udp(
.recv_from(&mut zero_copy)
.try_or_cancel(cancel_handle)
.await?;
Ok(json!({
"size": size,
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": "udp",
}
}))
Ok(OpPacket {
size,
remote_addr: OpAddr::Udp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
}),
})
}

async fn op_datagram_receive(
state: Rc<RefCell<OpState>>,
args: ReceiveArgs,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<Value, AnyError> {
) -> Result<OpPacket, AnyError> {
match args.transport.as_str() {
"udp" => receive_udp(state, args, zero_copy).await,
#[cfg(unix)]
"unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
_ => Err(generic_error(format!(
"Unsupported transport protocol {}",
args.transport
))),
other => Err(bad_transport(other)),
}
}

Expand All @@ -187,7 +213,7 @@ async fn op_datagram_send(
state: Rc<RefCell<OpState>>,
args: SendArgs,
zero_copy: Option<ZeroCopyBuf>,
) -> Result<Value, AnyError> {
) -> Result<usize, AnyError> {
let zero_copy = zero_copy.ok_or_else(null_opbuf)?;
let zero_copy = zero_copy.clone();

Expand Down Expand Up @@ -215,7 +241,7 @@ async fn op_datagram_send(
.ok_or_else(|| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let byte_length = socket.send_to(&zero_copy, &addr).await?;
Ok(json!(byte_length))
Ok(byte_length)
}
#[cfg(unix)]
SendArgs {
Expand All @@ -239,7 +265,7 @@ async fn op_datagram_send(
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let byte_length = socket.send_to(&zero_copy, address_path).await?;
Ok(json!(byte_length))
Ok(byte_length)
}
_ => Err(type_error("Wrong argument format!")),
}
Expand All @@ -256,7 +282,7 @@ async fn op_connect(
state: Rc<RefCell<OpState>>,
args: ConnectArgs,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<Value, AnyError> {
) -> Result<OpConn, AnyError> {
match args {
ConnectArgs {
transport,
Expand All @@ -281,19 +307,17 @@ async fn op_connect(
let rid = state_
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": transport,
},
"remoteAddr": {
"hostname": remote_addr.ip().to_string(),
"port": remote_addr.port(),
"transport": transport,
}
}))
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
}
#[cfg(unix)]
ConnectArgs {
Expand All @@ -315,17 +339,15 @@ async fn op_connect(
let mut state_ = state.borrow_mut();
let resource = UnixStreamResource::new(unix_stream.into_split());
let rid = state_.resource_table.add(resource);
Ok(json!({
"rid": rid,
"localAddr": {
"path": local_addr.as_pathname(),
"transport": transport,
},
"remoteAddr": {
"path": remote_addr.as_pathname(),
"transport": transport,
}
}))
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
path: local_addr.as_pathname().and_then(net_unix::pathstring),
})),
remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
path: remote_addr.as_pathname().and_then(net_unix::pathstring),
})),
})
}
_ => Err(type_error("Wrong argument format!")),
}
Expand Down Expand Up @@ -420,7 +442,7 @@ fn op_listen(
state: &mut OpState,
args: ListenArgs,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<Value, AnyError> {
) -> Result<OpConn, AnyError> {
let permissions = state.borrow::<Permissions>();
match args {
ListenArgs {
Expand All @@ -447,14 +469,20 @@ fn op_listen(
local_addr.ip().to_string(),
local_addr.port()
);
Ok(json!({
"rid": rid,
"localAddr": {
"hostname": local_addr.ip().to_string(),
"port": local_addr.port(),
"transport": transport,
},
}))
let ip_addr = IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
};
Ok(OpConn {
rid,
local_addr: Some(match transport.as_str() {
"udp" => OpAddr::Udp(ip_addr),
"tcp" => OpAddr::Tcp(ip_addr),
// NOTE: This could be unreachable!()
other => return Err(bad_transport(other)),
}),
remote_addr: None,
})
}
#[cfg(unix)]
ListenArgs {
Expand Down Expand Up @@ -482,13 +510,19 @@ fn op_listen(
rid,
local_addr.as_pathname().unwrap().display(),
);
Ok(json!({
"rid": rid,
"localAddr": {
"path": local_addr.as_pathname(),
"transport": transport,
},
}))
let unix_addr = net_unix::UnixAddr {
path: local_addr.as_pathname().and_then(net_unix::pathstring),
};

Ok(OpConn {
rid,
local_addr: Some(match transport.as_str() {
"unix" => OpAddr::Unix(unix_addr),
"unixpacket" => OpAddr::UnixPacket(unix_addr),
other => return Err(bad_transport(other)),
}),
remote_addr: None,
})
}
#[cfg(unix)]
_ => Err(type_error("Wrong argument format!")),
Expand Down Expand Up @@ -546,7 +580,7 @@ async fn op_dns_resolve(
state: Rc<RefCell<OpState>>,
args: ResolveAddrArgs,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<Value, AnyError> {
) -> Result<Vec<DnsReturnRecord>, AnyError> {
let ResolveAddrArgs {
query,
record_type,
Expand Down Expand Up @@ -584,15 +618,15 @@ async fn op_dns_resolve(

let resolver = AsyncResolver::tokio(config, opts)?;

let results: Vec<DnsReturnRecord> = resolver
let results = resolver
.lookup(query, record_type, Default::default())
.await
.map_err(|e| generic_error(format!("{}", e)))?
.iter()
.filter_map(rdata_to_return_record(record_type))
.collect();

Ok(json!(results))
Ok(results)
}

fn rdata_to_return_record(
Expand Down
Loading

0 comments on commit ff5d072

Please sign in to comment.