Skip to content

Commit

Permalink
Merge pull request JuliaLang#17522 from JuliaLang/jn/reliable-flush-c…
Browse files Browse the repository at this point in the history
…lose

reliable flush/close on streams, and avoid zombies
  • Loading branch information
vtjnash authored Aug 4, 2016
2 parents 1eeb773 + 67efdcf commit 08ae28a
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 34 deletions.
19 changes: 13 additions & 6 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ type Process <: AbstractPipe
typemin(fieldtype(Process, :termsignal)),
false, Condition(), false, Condition())
finalizer(this, uvfinalize)
this
return this
end
end
pipe_reader(p::Process) = p.out
Expand Down Expand Up @@ -325,9 +325,12 @@ function _jl_spawn(cmd, argv, loop::Ptr{Void}, pp::Process,
end

function uvfinalize(proc::Process)
proc.handle != C_NULL && ccall(:jl_close_uv, Void, (Ptr{Void},), proc.handle)
disassociate_julia_struct(proc)
proc.handle = C_NULL
if proc.handle != C_NULL
disassociate_julia_struct(proc.handle)
ccall(:jl_close_uv, Void, (Ptr{Void},), proc.handle)
proc.handle = C_NULL
end
nothing
end

function uv_return_spawn(p::Ptr{Void}, exit_status::Int64, termsignal::Int32)
Expand All @@ -336,15 +339,19 @@ function uv_return_spawn(p::Ptr{Void}, exit_status::Int64, termsignal::Int32)
proc = unsafe_pointer_to_objref(data)::Process
proc.exitcode = exit_status
proc.termsignal = termsignal
if isa(proc.exitcb, Function) proc.exitcb(proc, exit_status, termsignal) end
if isa(proc.exitcb, Function)
proc.exitcb(proc, exit_status, termsignal)
end
ccall(:jl_close_uv, Void, (Ptr{Void},), proc.handle)
notify(proc.exitnotify)
nothing
end

function _uv_hook_close(proc::Process)
proc.handle = C_NULL
if isa(proc.closecb, Function) proc.closecb(proc) end
if isa(proc.closecb, Function)
proc.closecb(proc)
end
notify(proc.closenotify)
end

Expand Down
14 changes: 3 additions & 11 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ function TCPSocket()
throw(UVError("failed to create tcp socket",err))
end
this.status = StatusInit
this
return this
end

type TCPServer <: LibuvServer
Expand Down Expand Up @@ -312,7 +312,7 @@ function TCPServer()
throw(UVError("failed to create tcp server",err))
end
this.status = StatusInit
this
return this
end

isreadable(io::TCPSocket) = isopen(io) || nb_available(io) > 0
Expand Down Expand Up @@ -365,19 +365,11 @@ function UDPSocket()
throw(UVError("failed to create udp socket",err))
end
this.status = StatusInit
this
return this
end

show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")")

function uvfinalize(uv::Union{TTY,PipeEndpoint,PipeServer,TCPServer,TCPSocket,UDPSocket})
if (uv.status != StatusUninit && uv.status != StatusInit)
close(uv)
end
disassociate_julia_struct(uv)
uv.handle = C_NULL
end

function _uv_hook_close(sock::UDPSocket)
sock.handle = C_NULL
sock.status = StatusClosed
Expand Down
31 changes: 26 additions & 5 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,26 @@ function wait_close(x::Union{LibuvStream, LibuvServer})
end

function close(stream::Union{LibuvStream, LibuvServer})
if isopen(stream) && stream.status != StatusClosing
ccall(:jl_close_uv,Void, (Ptr{Void},), stream.handle)
stream.status = StatusClosing
if isopen(stream)
if stream.status != StatusClosing
ccall(:jl_close_uv, Void, (Ptr{Void},), stream.handle)
stream.status = StatusClosing
end
if uv_handle_data(stream) != C_NULL
stream_wait(stream, stream.closenotify)
end
end
nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
if uv.handle != C_NULL
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
if uv.status != StatusUninit && uv.status != StatusInit
close(uv)
uv.handle = C_NULL
uv.status = StatusClosed
end
end
nothing
end
Expand Down Expand Up @@ -472,8 +489,10 @@ function uv_readcb(handle::Ptr{Void}, nread::Cssize_t, buf::Ptr{Void})
stream.status = StatusEOF # libuv called stop_reading already
notify(stream.readnotify)
notify(stream.closenotify)
else
close(stream)
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Void, (Ptr{Void},), stream.handle)
stream.status = StatusClosing
end
else
# This is a fatal connection error. Shutdown requests as per the usual
Expand Down Expand Up @@ -1019,6 +1038,8 @@ function close(s::BufferStream)
notify(s.close_c; all=true)
nothing
end
uvfinalize(s::BufferStream) = nothing

read(s::BufferStream, ::Type{UInt8}) = (wait_readnb(s, 1); read(s.buffer, UInt8))
unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) = (wait_readnb(s, Int(nb)); unsafe_read(s.buffer, a, nb))
nb_available(s::BufferStream) = nb_available(s.buffer)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
c6a019d79d20eabc39619a04961c9a3b
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
478ab473244b01bef344892a75e09fef50da8fb1a7212e0257c53f3223de4fde5f6bd449eef34bc1f025481c7d9f854002acb6eb203b447a50a34bae4ad9dee4

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion deps/libuv.version
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
LIBUV_BRANCH=julia-uv1.9.0
LIBUV_SHA1=ecbd6eddfac4940ab8db57c73166a7378563ebd3
LIBUV_SHA1=cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68
9 changes: 8 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ static struct uv_shutdown_queue_item *next_shutdown_queue_item(struct uv_shutdow

void jl_init_timing(void);
void jl_destroy_timing(void);
void jl_uv_call_close_callback(jl_value_t *val);

JL_DLLEXPORT void jl_atexit_hook(int exitcode)
{
Expand Down Expand Up @@ -270,6 +271,13 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode)
continue;
}
switch(handle->type) {
case UV_PROCESS:
// cause Julia to forget about the Process object
if (handle->data)
jl_uv_call_close_callback((jl_value_t*)handle->data);
// and make libuv think it is already dead
((uv_process_t*)handle)->pid = 0;
// fall-through
case UV_TTY:
case UV_UDP:
case UV_TCP:
Expand All @@ -283,7 +291,6 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode)
case UV_PREPARE:
case UV_CHECK:
case UV_SIGNAL:
case UV_PROCESS:
case UV_FILE:
// These will be shutdown as appropriate by jl_close_uv
jl_close_uv(handle);
Expand Down
23 changes: 18 additions & 5 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ void jl_init_signal_async(void)
}
#endif

static void jl_uv_call_close_callback(jl_value_t *val)
void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t *args[2];
args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module),
jl_symbol("_uv_hook_close")); // topmod(typeof(val))._uv_hook_close
args[1] = val;
assert(args[0]);
jl_apply(args, 2);
jl_apply(args, 2); // TODO: wrap in try-catch?
}

JL_DLLEXPORT void jl_uv_closeHandle(uv_handle_t *handle)
static void jl_uv_closeHandle(uv_handle_t *handle)
{
// if the user killed a stdio handle,
// revert back to direct stdio FILE* writes
Expand All @@ -107,7 +107,7 @@ JL_DLLEXPORT void jl_uv_closeHandle(uv_handle_t *handle)
free(handle);
}

JL_DLLEXPORT void jl_uv_shutdownCallback(uv_shutdown_t *req, int status)
static void jl_uv_shutdownCallback(uv_shutdown_t *req, int status)
{
/*
* This happens if the remote machine closes the connecition while we're
Expand Down Expand Up @@ -180,8 +180,21 @@ JL_DLLEXPORT int jl_init_pipe(uv_pipe_t *pipe, int writable, int readable,
return err;
}

static void jl_proc_exit_cleanup(uv_process_t *process, int64_t exit_status, int term_signal)
{
uv_close((uv_handle_t*)process, (uv_close_cb)&free);
}

JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
{
if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) {
// take ownership of this handle,
// so we can waitpid for the resource to exit and avoid leaving zombies
assert(handle->data == NULL); // make sure Julia has forgotten about it already
((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup;
return;
}

if (handle->type == UV_FILE) {
uv_fs_t req;
jl_uv_file_t *fd = (jl_uv_file_t*)handle;
Expand Down Expand Up @@ -230,7 +243,7 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)

JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
{
uv_close(handle,&jl_uv_closeHandle);
uv_close(handle, &jl_uv_closeHandle);
}

JL_DLLEXPORT void jl_uv_associate_julia_struct(uv_handle_t *handle,
Expand Down
10 changes: 7 additions & 3 deletions test/spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ let bad = "bad\0name"
end

# issue #12829
let out = Pipe(), echo = `$exename --startup-file=no -e 'print(STDOUT, " 1\t", readstring(STDIN))'`, ready = Condition()
let out = Pipe(), echo = `$exename --startup-file=no -e 'print(STDOUT, " 1\t", readstring(STDIN))'`, ready = Condition(), t
@test_throws ArgumentError write(out, "not open error")
@async begin # spawn writer task
t = @async begin # spawn writer task
open(echo, "w", out) do in1
open(echo, "w", out) do in2
notify(ready)
Expand All @@ -282,10 +282,13 @@ let out = Pipe(), echo = `$exename --startup-file=no -e 'print(STDOUT, " 1\t", r
@test isreadable(out)
@test iswritable(out)
close(out.in)
@test !isopen(out.in)
is_windows() || @test !isopen(out.out) # it takes longer to propagate EOF through the Windows event system
@test_throws ArgumentError write(out, "now closed error")
@test isreadable(out)
@test !iswritable(out)
@test isopen(out)
is_windows() && Base.process_events(false) # should be enough steps to fully propagate EOF now
@test !isopen(out)
end
wait(ready) # wait for writer task to be ready before using `out`
@test nb_available(out) == 0
Expand All @@ -308,6 +311,7 @@ let out = Pipe(), echo = `$exename --startup-file=no -e 'print(STDOUT, " 1\t", r
@test isempty(read(out))
@test eof(out)
@test desc == "Pipe(open => active, 0 bytes waiting)"
wait(t)
end

# issue #8529
Expand Down

0 comments on commit 08ae28a

Please sign in to comment.