Skip to content

Commit

Permalink
Implement closeRead/closeWrite using TcpStream::shutdown (denoland#903)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinkassimo authored and ry committed Oct 5, 2018
1 parent 6c42ded commit 941e27d
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 7 deletions.
26 changes: 22 additions & 4 deletions js/net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class ListenerImpl implements Listener {
export interface Conn extends Reader, Writer, Closer {
localAddr: string;
remoteAddr: string;
closeRead(): void;
closeWrite(): void;
}

class ConnImpl implements Conn {
Expand All @@ -80,19 +82,35 @@ class ConnImpl implements Conn {
* Most callers should just use close().
*/
closeRead(): void {
// TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
return notImplemented();
shutdown(this.fd, ShutdownMode.Read);
}

/** closeWrite shuts down (shutdown(2)) the writing side of the TCP
* connection. Most callers should just use close().
*/
closeWrite(): void {
// TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
return notImplemented();
shutdown(this.fd, ShutdownMode.Write);
}
}

enum ShutdownMode {
// See http:https://man7.org/linux/man-pages/man2/shutdown.2.html
// Corresponding to SHUT_RD, SHUT_WR, SHUT_RDWR
Read = 0,
Write,
ReadWrite // unused
}

function shutdown(fd: number, how: ShutdownMode) {
const builder = new flatbuffers.Builder();
msg.Shutdown.startShutdown(builder);
msg.Shutdown.addRid(builder, fd);
msg.Shutdown.addHow(builder, how);
const inner = msg.Shutdown.endShutdown(builder);
const baseRes = dispatch.sendSync(builder, msg.Any.Shutdown, inner);
assert(baseRes == null);
}

/** Listen announces on the local network address.
*
* The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
Expand Down
113 changes: 113 additions & 0 deletions js/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import * as deno from "deno";
import { testPerm, assert, assertEqual } from "./test_util.ts";
import { deferred } from "./util.ts";

testPerm({ net: true }, function netListenClose() {
const listener = deno.listen("tcp", "127.0.0.1:4500");
Expand Down Expand Up @@ -35,3 +36,115 @@ testPerm({ net: true }, async function netDialListen() {
listener.close();
conn.close();
});

testPerm({ net: true }, async function netCloseReadSuccess() {
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEqual(3, readResult.nread);
assertEqual(4, buf[0]);
assertEqual(5, buf[1]);
assertEqual(6, buf[2]);
conn.close();
closeDeferred.resolve();
});
const conn = await deno.dial("tcp", addr);
conn.closeRead(); // closing read
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEqual(0, readResult.nread); // No error, read nothing
assertEqual(true, readResult.eof); // with immediate EOF
// Ensure closeRead does not impact write
await conn.write(new Uint8Array([4, 5, 6]));
await closeDeferred.promise;
listener.close();
conn.close();
});

testPerm({ net: true }, async function netDoubleCloseRead() {
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
});
const conn = await deno.dial("tcp", addr);
conn.closeRead(); // closing read
let err;
try {
// Duplicated close should throw error
conn.closeRead();
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, deno.ErrorKind.NotConnected);
assertEqual(err.name, "NotConnected");
closeDeferred.resolve();
listener.close();
conn.close();
});

testPerm({ net: true }, async function netCloseWriteSuccess() {
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
});
const conn = await deno.dial("tcp", addr);
conn.closeWrite(); // closing write
const buf = new Uint8Array(1024);
// Check read not impacted
const readResult = await conn.read(buf);
assertEqual(3, readResult.nread);
assertEqual(1, buf[0]);
assertEqual(2, buf[1]);
assertEqual(3, buf[2]);
// Check write should be closed
let err;
try {
await conn.write(new Uint8Array([1, 2, 3]));
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, deno.ErrorKind.BrokenPipe);
assertEqual(err.name, "BrokenPipe");
closeDeferred.resolve();
listener.close();
conn.close();
});

testPerm({ net: true }, async function netDoubleCloseWrite() {
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await closeDeferred.promise;
conn.close();
});
const conn = await deno.dial("tcp", addr);
conn.closeWrite(); // closing write
let err;
try {
// Duplicated close should throw error
conn.closeWrite();
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, deno.ErrorKind.NotConnected);
assertEqual(err.name, "NotConnected");
closeDeferred.resolve();
listener.close();
conn.close();
});
26 changes: 26 additions & 0 deletions js/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,29 @@ export function containsOnlyASCII(str: string): boolean {
}
return /^[\x00-\x7F]*$/.test(str);
}

// @internal
export interface Deferred {
promise: Promise<void>;
resolve: Function;
reject: Function;
}

/**
* Create a wrapper around a promise that could be
* resolved externally.
* @internal
*/
export function deferred(): Deferred {
let resolve: Function | undefined;
let reject: Function | undefined;
const promise = new Promise<void>((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve: resolve!,
reject: reject!
};
}
6 changes: 6 additions & 0 deletions src/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ union Any {
Write,
WriteRes,
Close,
Shutdown,
Listen,
ListenRes,
Accept,
Expand Down Expand Up @@ -290,6 +291,11 @@ table Close {
rid: int;
}

table Shutdown {
rid: int;
how: uint;
}

table Listen {
network: string;
address: string;
Expand Down
35 changes: 33 additions & 2 deletions src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use isolate::Isolate;
use isolate::IsolateState;
use isolate::Op;
use msg;
use resources;
use resources::Resource;
use tokio_util;

use flatbuffers::FlatBufferBuilder;
Expand All @@ -19,10 +21,9 @@ use hyper;
use hyper::rt::{Future, Stream};
use hyper::Client;
use remove_dir_all::remove_dir_all;
use resources;
use std;
use std::fs;
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
Expand Down Expand Up @@ -84,6 +85,7 @@ pub fn dispatch(
msg::Any::Read => op_read,
msg::Any::Write => op_write,
msg::Any::Close => op_close,
msg::Any::Shutdown => op_shutdown,
msg::Any::Remove => op_remove,
msg::Any::ReadFile => op_read_file,
msg::Any::ReadDir => op_read_dir,
Expand Down Expand Up @@ -614,6 +616,35 @@ fn op_close(
}
}

fn op_shutdown(
_state: Arc<IsolateState>,
base: &msg::Base,
data: &'static mut [u8],
) -> Box<Op> {
assert_eq!(data.len(), 0);
let inner = base.inner_as_shutdown().unwrap();
let rid = inner.rid();
let how = inner.how();
match resources::lookup(rid) {
None => odd_future(errors::new(
errors::ErrorKind::BadFileDescriptor,
String::from("Bad File Descriptor"),
)),
Some(mut resource) => {
let shutdown_mode = match how {
0 => Shutdown::Read,
1 => Shutdown::Write,
_ => unimplemented!(),
};
blocking!(base.sync(), || {
// Use UFCS for disambiguation
Resource::shutdown(&mut resource, shutdown_mode)?;
Ok(empty_buf())
})
}
}
}

fn op_read(
_state: Arc<IsolateState>,
base: &msg::Base,
Expand Down
18 changes: 17 additions & 1 deletion src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
// descriptors". This module implements a global resource table. Ops (AKA
// handlers) look up resources by their integer id here.

use errors::DenoError;

use futures;
use futures::Poll;
use std;
use std::collections::HashMap;
use std::io::Error;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
Expand Down Expand Up @@ -79,6 +81,20 @@ impl Resource {
let r = table.remove(&self.rid);
assert!(r.is_some());
}

pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&self.rid);
match maybe_repr {
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpStream(ref mut f) => {
TcpStream::shutdown(f, how).map_err(|err| DenoError::from(err))
}
_ => panic!("Cannot shutdown"),
},
}
}
}

impl Read for Resource {
Expand Down

0 comments on commit 941e27d

Please sign in to comment.