Skip to content

Commit

Permalink
add locks for libuv (#31437)
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson committed Mar 22, 2019
1 parent 6a28aa9 commit ad40952
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 52 deletions.
6 changes: 3 additions & 3 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ mutable struct Timer
associate_julia_struct(this.handle, this)
finalizer(uvfinalize, this)

ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
ccall(:jl_uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:jl_uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
this, uv_jl_timercb::Ptr{Cvoid},
UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000)))
return this
Expand All @@ -266,7 +266,7 @@ isopen(t::Union{Timer, AsyncCondition}) = t.isopen
function close(t::Union{Timer, AsyncCondition})
if t.handle != C_NULL && isopen(t)
t.isopen = false
isa(t, Timer) && ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), t)
isa(t, Timer) && ccall(:jl_uv_timer_stop, Cint, (Ptr{Cvoid},), t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
nothing
Expand Down
4 changes: 2 additions & 2 deletions base/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ function readdir(path::AbstractString)
uv_readdir_req = zeros(UInt8, ccall(:jl_sizeof_uv_fs_t, Int32, ()))

# defined in sys.c, to call uv_fs_readdir, which sets errno on error.
err = ccall(:uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}),
err = ccall(:jl_uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}),
eventloop(), uv_readdir_req, path, 0, C_NULL)
err < 0 && throw(SystemError("unable to read directory $path", -err))
#uv_error("unable to read directory $path", err)
Expand Down Expand Up @@ -808,7 +808,7 @@ Return the target location a symbolic link `path` points to.
function readlink(path::AbstractString)
req = Libc.malloc(_sizeof_uv_fs)
try
ret = ccall(:uv_fs_readlink, Int32,
ret = ccall(:jl_uv_fs_readlink, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}),
eventloop(), req, path, C_NULL)
if ret < 0
Expand Down
6 changes: 3 additions & 3 deletions base/filesystem.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function open(path::AbstractString, flags::Integer, mode::Integer=0)
req = Libc.malloc(_sizeof_uv_fs)
local handle
try
ret = ccall(:uv_fs_open, Int32,
ret = ccall(:jl_uv_fs_open, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32, Int32, Ptr{Cvoid}),
eventloop(), req, path, flags, mode, C_NULL)
handle = ccall(:jl_uv_fs_result, Cssize_t, (Ptr{Cvoid},), req)
Expand Down Expand Up @@ -131,7 +131,7 @@ write(f::File, c::UInt8) = write(f, Ref{UInt8}(c))
function truncate(f::File, n::Integer)
check_open(f)
req = Libc.malloc(_sizeof_uv_fs)
err = ccall(:uv_fs_ftruncate, Int32,
err = ccall(:jl_uv_fs_ftruncate, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int64, Ptr{Cvoid}),
eventloop(), req, f.handle, n, C_NULL)
Libc.free(req)
Expand All @@ -142,7 +142,7 @@ end
function futime(f::File, atime::Float64, mtime::Float64)
check_open(f)
req = Libc.malloc(_sizeof_uv_fs)
err = ccall(:uv_fs_futime, Int32,
err = ccall(:jl_uv_fs_futime, Int32,
(Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Float64, Float64, Ptr{Cvoid}),
eventloop(), req, f.handle, atime, mtime, C_NULL)
Libc.free(req)
Expand Down
6 changes: 3 additions & 3 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
((bytesavailable(stream.buffer) >= stream.throttle) ||
(bytesavailable(stream.buffer) >= stream.buffer.maxsize)))
# save cycles by stopping kernel notifications from arriving
ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream)
stream.status = StatusOpen
end
nothing
Expand Down Expand Up @@ -712,7 +712,7 @@ function start_reading(stream::LibuvStream)
# libuv may call the alloc callback immediately
# for a TTY on Windows, so ensure the status is set first
stream.status = StatusActive
ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
ret = ccall(:jl_uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
stream, uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_readcb::Ptr{Cvoid})
return ret
elseif stream.status == StatusPaused
Expand All @@ -734,7 +734,7 @@ if Sys.iswindows()
function stop_reading(stream::LibuvStream)
if stream.status == StatusActive
stream.status = StatusOpen
ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream)
ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream)
end
nothing
end
Expand Down
14 changes: 13 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ static void jl_uv_exitcleanup_add(uv_handle_t *handle, struct uv_shutdown_queue
struct uv_shutdown_queue_item *item = (struct uv_shutdown_queue_item*)malloc(sizeof(struct uv_shutdown_queue_item));
item->h = handle;
item->next = NULL;
JL_UV_LOCK();
if (queue->last) queue->last->next = item;
if (!queue->first) queue->first = item;
queue->last = item;
JL_UV_UNLOCK();
}

static void jl_uv_exitcleanup_walk(uv_handle_t *handle, void *arg)
Expand Down Expand Up @@ -259,6 +261,7 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode)
}

struct uv_shutdown_queue queue = {NULL, NULL};
JL_UV_LOCK();
uv_walk(loop, jl_uv_exitcleanup_walk, &queue);
struct uv_shutdown_queue_item *item = queue.first;
if (ptls->current_task != NULL) {
Expand Down Expand Up @@ -288,6 +291,7 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode)

// force libuv to spin until everything has finished closing
loop->stop_flag = 0;
JL_UV_UNLOCK();
while (uv_run(loop, UV_RUN_DEFAULT)) { }

// TODO: Destroy threads
Expand Down Expand Up @@ -406,23 +410,31 @@ static void *init_stdio_handle(const char *stdio, uv_os_fd_t fd, int readable)
break;
case UV_NAMED_PIPE:
handle = malloc(sizeof(uv_pipe_t));
JL_UV_LOCK();
if ((err = uv_pipe_init(jl_io_loop, (uv_pipe_t*)handle, 0))) {
// JL_UV_UNLOCK() equivalent is done during unwinding
jl_errorf("error initializing %s in uv_pipe_init: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err);
}
if ((err = uv_pipe_open((uv_pipe_t*)handle, fd))) {
// JL_UV_UNLOCK() equivalent is done during unwinding
jl_errorf("error initializing %s in uv_pipe_open: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err);
}
((uv_pipe_t*)handle)->data = NULL;
JL_UV_UNLOCK();
break;
case UV_TCP:
handle = malloc(sizeof(uv_tcp_t));
JL_UV_LOCK();
if ((err = uv_tcp_init(jl_io_loop, (uv_tcp_t*)handle))) {
// JL_UV_UNLOCK() equivalent is done during unwinding
jl_errorf("error initializing %s in uv_tcp_init: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err);
}
if ((err = uv_tcp_open((uv_tcp_t*)handle, (uv_os_sock_t)fd))) {
// JL_UV_UNLOCK() equivalent is done during unwinding
jl_errorf("error initializing %s in uv_tcp_open: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err);
}
((uv_tcp_t*)handle)->data = NULL;
JL_UV_UNLOCK();
break;
}
return handle;
Expand Down Expand Up @@ -658,7 +670,7 @@ void _julia_init(JL_IMAGE_SEARCH rel)
ios_set_io_wait_func = jl_set_io_wait;
jl_io_loop = uv_default_loop(); // this loop will internal events (spawning process etc.),
// best to call this first, since it also initializes libuv
jl_init_signal_async();
jl_init_uv();
restore_signals();

jl_resolve_sysimg_location(rel);
Expand Down
Loading

0 comments on commit ad40952

Please sign in to comment.