diff --git a/base/event.jl b/base/event.jl index ecd1db5525c2b..603586014e3ee 100644 --- a/base/event.jl +++ b/base/event.jl @@ -245,8 +245,8 @@ mutable struct Timer associate_julia_struct(this.handle, this) finalizer(uvfinalize, this) - ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop()) - ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), + ccall(:jl_uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop()) + ccall(:jl_uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), this, uv_jl_timercb::Ptr{Cvoid}, UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000))) return this @@ -266,7 +266,7 @@ isopen(t::Union{Timer, AsyncCondition}) = t.isopen function close(t::Union{Timer, AsyncCondition}) if t.handle != C_NULL && isopen(t) t.isopen = false - isa(t, Timer) && ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), t) + isa(t, Timer) && ccall(:jl_uv_timer_stop, Cint, (Ptr{Cvoid},), t) ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t) end nothing diff --git a/base/file.jl b/base/file.jl index 0d0b4a4cdc271..b7f98c7323774 100644 --- a/base/file.jl +++ b/base/file.jl @@ -623,7 +623,7 @@ function readdir(path::AbstractString) uv_readdir_req = zeros(UInt8, ccall(:jl_sizeof_uv_fs_t, Int32, ())) # defined in sys.c, to call uv_fs_readdir, which sets errno on error. - err = ccall(:uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}), + err = ccall(:jl_uv_fs_scandir, Int32, (Ptr{Cvoid}, Ptr{UInt8}, Cstring, Cint, Ptr{Cvoid}), eventloop(), uv_readdir_req, path, 0, C_NULL) err < 0 && throw(SystemError("unable to read directory $path", -err)) #uv_error("unable to read directory $path", err) @@ -808,7 +808,7 @@ Return the target location a symbolic link `path` points to. function readlink(path::AbstractString) req = Libc.malloc(_sizeof_uv_fs) try - ret = ccall(:uv_fs_readlink, Int32, + ret = ccall(:jl_uv_fs_readlink, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), eventloop(), req, path, C_NULL) if ret < 0 diff --git a/base/filesystem.jl b/base/filesystem.jl index dd0de04cab0ac..7c252099afa02 100644 --- a/base/filesystem.jl +++ b/base/filesystem.jl @@ -73,7 +73,7 @@ function open(path::AbstractString, flags::Integer, mode::Integer=0) req = Libc.malloc(_sizeof_uv_fs) local handle try - ret = ccall(:uv_fs_open, Int32, + ret = ccall(:jl_uv_fs_open, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32, Int32, Ptr{Cvoid}), eventloop(), req, path, flags, mode, C_NULL) handle = ccall(:jl_uv_fs_result, Cssize_t, (Ptr{Cvoid},), req) @@ -131,7 +131,7 @@ write(f::File, c::UInt8) = write(f, Ref{UInt8}(c)) function truncate(f::File, n::Integer) check_open(f) req = Libc.malloc(_sizeof_uv_fs) - err = ccall(:uv_fs_ftruncate, Int32, + err = ccall(:jl_uv_fs_ftruncate, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int64, Ptr{Cvoid}), eventloop(), req, f.handle, n, C_NULL) Libc.free(req) @@ -142,7 +142,7 @@ end function futime(f::File, atime::Float64, mtime::Float64) check_open(f) req = Libc.malloc(_sizeof_uv_fs) - err = ccall(:uv_fs_futime, Int32, + err = ccall(:jl_uv_fs_futime, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Float64, Float64, Ptr{Cvoid}), eventloop(), req, f.handle, atime, mtime, C_NULL) Libc.free(req) diff --git a/base/stream.jl b/base/stream.jl index 6bf914fc05d77..8d90bd8be9ead 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -573,7 +573,7 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) ((bytesavailable(stream.buffer) >= stream.throttle) || (bytesavailable(stream.buffer) >= stream.buffer.maxsize))) # save cycles by stopping kernel notifications from arriving - ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) + ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream) stream.status = StatusOpen end nothing @@ -712,7 +712,7 @@ function start_reading(stream::LibuvStream) # libuv may call the alloc callback immediately # for a TTY on Windows, so ensure the status is set first stream.status = StatusActive - ret = ccall(:uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), + ret = ccall(:jl_uv_read_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), stream, uv_jl_alloc_buf::Ptr{Cvoid}, uv_jl_readcb::Ptr{Cvoid}) return ret elseif stream.status == StatusPaused @@ -734,7 +734,7 @@ if Sys.iswindows() function stop_reading(stream::LibuvStream) if stream.status == StatusActive stream.status = StatusOpen - ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) + ccall(:jl_uv_read_stop, Cint, (Ptr{Cvoid},), stream) end nothing end diff --git a/src/init.c b/src/init.c index dd7924823efae..971732d2a92c9 100644 --- a/src/init.c +++ b/src/init.c @@ -157,9 +157,11 @@ static void jl_uv_exitcleanup_add(uv_handle_t *handle, struct uv_shutdown_queue struct uv_shutdown_queue_item *item = (struct uv_shutdown_queue_item*)malloc(sizeof(struct uv_shutdown_queue_item)); item->h = handle; item->next = NULL; + JL_UV_LOCK(); if (queue->last) queue->last->next = item; if (!queue->first) queue->first = item; queue->last = item; + JL_UV_UNLOCK(); } static void jl_uv_exitcleanup_walk(uv_handle_t *handle, void *arg) @@ -259,6 +261,7 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode) } struct uv_shutdown_queue queue = {NULL, NULL}; + JL_UV_LOCK(); uv_walk(loop, jl_uv_exitcleanup_walk, &queue); struct uv_shutdown_queue_item *item = queue.first; if (ptls->current_task != NULL) { @@ -288,6 +291,7 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode) // force libuv to spin until everything has finished closing loop->stop_flag = 0; + JL_UV_UNLOCK(); while (uv_run(loop, UV_RUN_DEFAULT)) { } // TODO: Destroy threads @@ -406,23 +410,31 @@ static void *init_stdio_handle(const char *stdio, uv_os_fd_t fd, int readable) break; case UV_NAMED_PIPE: handle = malloc(sizeof(uv_pipe_t)); + JL_UV_LOCK(); if ((err = uv_pipe_init(jl_io_loop, (uv_pipe_t*)handle, 0))) { + // JL_UV_UNLOCK() equivalent is done during unwinding jl_errorf("error initializing %s in uv_pipe_init: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err); } if ((err = uv_pipe_open((uv_pipe_t*)handle, fd))) { + // JL_UV_UNLOCK() equivalent is done during unwinding jl_errorf("error initializing %s in uv_pipe_open: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err); } ((uv_pipe_t*)handle)->data = NULL; + JL_UV_UNLOCK(); break; case UV_TCP: handle = malloc(sizeof(uv_tcp_t)); + JL_UV_LOCK(); if ((err = uv_tcp_init(jl_io_loop, (uv_tcp_t*)handle))) { + // JL_UV_UNLOCK() equivalent is done during unwinding jl_errorf("error initializing %s in uv_tcp_init: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err); } if ((err = uv_tcp_open((uv_tcp_t*)handle, (uv_os_sock_t)fd))) { + // JL_UV_UNLOCK() equivalent is done during unwinding jl_errorf("error initializing %s in uv_tcp_open: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err); } ((uv_tcp_t*)handle)->data = NULL; + JL_UV_UNLOCK(); break; } return handle; @@ -658,7 +670,7 @@ void _julia_init(JL_IMAGE_SEARCH rel) ios_set_io_wait_func = jl_set_io_wait; jl_io_loop = uv_default_loop(); // this loop will internal events (spawning process etc.), // best to call this first, since it also initializes libuv - jl_init_signal_async(); + jl_init_uv(); restore_signals(); jl_resolve_sysimg_location(rel); diff --git a/src/jl_uv.c b/src/jl_uv.c index 93d850e1cfd7d..76dd01a6f68d3 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -65,6 +65,14 @@ void jl_init_signal_async(void) } #endif +jl_mutex_t jl_uv_mutex; + +void jl_init_uv(void) +{ + jl_init_signal_async(); + JL_MUTEX_INIT(&jl_uv_mutex); // a file-scope initializer can be used instead +} + void jl_uv_call_close_callback(jl_value_t *val) { jl_value_t *args[2]; @@ -89,6 +97,7 @@ static void jl_uv_closeHandle(uv_handle_t *handle) // also let the client app do its own cleanup if (handle->type != UV_FILE && handle->data) { size_t last_age = jl_get_ptls_states()->world_age; + // TODO: data race on jl_world_counter across many files, to be fixed in a separate revision jl_get_ptls_states()->world_age = jl_world_counter; jl_uv_call_close_callback((jl_value_t*)handle->data); jl_get_ptls_states()->world_age = last_age; @@ -149,6 +158,7 @@ void jl_uv_flush(uv_stream_t *stream) stream->type != UV_TCP && stream->type != UV_NAMED_PIPE) return; + JL_UV_LOCK(); while (uv_is_writable(stream) && stream->write_queue_size != 0) { int fired = 0; uv_buf_t buf; @@ -156,15 +166,19 @@ void jl_uv_flush(uv_stream_t *stream) buf.len = 0; uv_write_t *write_req = (uv_write_t*)malloc(sizeof(uv_write_t)); write_req->data = (void*)&fired; - if (uv_write(write_req, stream, &buf, 1, uv_flush_callback) != 0) + if (uv_write(write_req, stream, &buf, 1, uv_flush_callback) != 0) { + JL_UV_UNLOCK(); return; + } while (!fired) { uv_run(uv_default_loop(), UV_RUN_DEFAULT); } } + JL_UV_UNLOCK(); } // getters and setters +// TODO: check if whoever calls these is thread-safe JL_DLLEXPORT int jl_uv_process_pid(uv_process_t *p) { return p->pid; } JL_DLLEXPORT void *jl_uv_process_data(uv_process_t *p) { return p->data; } JL_DLLEXPORT void *jl_uv_buf_base(const uv_buf_t *buf) { return buf->base; } @@ -181,10 +195,13 @@ JL_DLLEXPORT void *jl_uv_write_handle(uv_write_t *req) { return req->handle; } JL_DLLEXPORT int jl_run_once(uv_loop_t *loop) { jl_ptls_t ptls = jl_get_ptls_states(); - if (loop && ptls->tid == 0) { - loop->stop_flag = 0; + if (loop) { jl_gc_safepoint_(ptls); - return uv_run(loop, UV_RUN_ONCE); + JL_UV_LOCK(); + loop->stop_flag = 0; + int r = uv_run(loop,UV_RUN_ONCE); + JL_UV_UNLOCK(); + return r; } return 0; } @@ -192,25 +209,30 @@ JL_DLLEXPORT int jl_run_once(uv_loop_t *loop) JL_DLLEXPORT void jl_run_event_loop(uv_loop_t *loop) { jl_ptls_t ptls = jl_get_ptls_states(); - if (loop && ptls->tid == 0) { - loop->stop_flag = 0; + if (loop) { jl_gc_safepoint_(ptls); - uv_run(loop, UV_RUN_DEFAULT); + JL_UV_LOCK(); + loop->stop_flag = 0; + uv_run(loop,UV_RUN_DEFAULT); + JL_UV_UNLOCK(); } } JL_DLLEXPORT int jl_process_events(uv_loop_t *loop) { jl_ptls_t ptls = jl_get_ptls_states(); - if (loop && ptls->tid == 0) { - loop->stop_flag = 0; + if (loop) { jl_gc_safepoint_(ptls); - return uv_run(loop, UV_RUN_NOWAIT); + JL_UV_LOCK(); + loop->stop_flag = 0; + int r = uv_run(loop,UV_RUN_NOWAIT); + JL_UV_UNLOCK(); + return r; } return 0; } -static void jl_proc_exit_cleanup(uv_process_t *process, int64_t exit_status, int term_signal) +static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status, int term_signal) { uv_close((uv_handle_t*)process, (uv_close_cb)&free); } @@ -221,10 +243,10 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle) // take ownership of this handle, // so we can waitpid for the resource to exit and avoid leaving zombies assert(handle->data == NULL); // make sure Julia has forgotten about it already - ((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup; + ((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb; return; } - + JL_UV_LOCK(); if (handle->type == UV_FILE) { uv_fs_t req; jl_uv_file_t *fd = (jl_uv_file_t*)handle; @@ -233,6 +255,7 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle) fd->file = (uv_os_fd_t)(ssize_t)-1; } jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state) + JL_UV_UNLOCK(); return; } @@ -240,20 +263,24 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle) uv_write_t *req = (uv_write_t*)malloc(sizeof(uv_write_t)); req->handle = (uv_stream_t*)handle; jl_uv_flush_close_callback(req, 0); + JL_UV_UNLOCK(); return; } + // avoid double-closing the stream if (!uv_is_closing(handle)) { - // avoid double-closing the stream uv_close(handle, &jl_uv_closeHandle); } + JL_UV_UNLOCK(); } JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle) { + // avoid double-closing the stream if (!uv_is_closing(handle)) { - // avoid double-closing the stream + JL_UV_LOCK(); uv_close(handle, &jl_uv_closeHandle); + JL_UV_UNLOCK(); } } @@ -298,7 +325,10 @@ JL_DLLEXPORT int jl_spawn(char *name, char **argv, } } opts.exit_cb = cb; - return uv_spawn(loop, proc, &opts); + JL_UV_LOCK(); + int r = uv_spawn(loop, proc, &opts); + JL_UV_UNLOCK(); + return r; } #ifdef _OS_WINDOWS_ @@ -321,7 +351,7 @@ JL_DLLEXPORT int jl_fs_unlink(char *path) { uv_fs_t req; JL_SIGATOMIC_BEGIN(); - int ret = uv_fs_unlink(jl_io_loop, &req, path, NULL); + int ret = uv_fs_unlink(unused_uv_loop_arg, &req, path, NULL); uv_fs_req_cleanup(&req); JL_SIGATOMIC_END(); return ret; @@ -331,7 +361,7 @@ JL_DLLEXPORT int jl_fs_rename(const char *src_path, const char *dst_path) { uv_fs_t req; JL_SIGATOMIC_BEGIN(); - int ret = uv_fs_rename(jl_io_loop, &req, src_path, dst_path, NULL); + int ret = uv_fs_rename(unused_uv_loop_arg, &req, src_path, dst_path, NULL); uv_fs_req_cleanup(&req); JL_SIGATOMIC_END(); return ret; @@ -342,7 +372,7 @@ JL_DLLEXPORT int jl_fs_sendfile(uv_os_fd_t src_fd, uv_os_fd_t dst_fd, { uv_fs_t req; JL_SIGATOMIC_BEGIN(); - int ret = uv_fs_sendfile(jl_io_loop, &req, dst_fd, src_fd, + int ret = uv_fs_sendfile(unused_uv_loop_arg, &req, dst_fd, src_fd, in_offset, len, NULL); uv_fs_req_cleanup(&req); JL_SIGATOMIC_END(); @@ -352,7 +382,7 @@ JL_DLLEXPORT int jl_fs_sendfile(uv_os_fd_t src_fd, uv_os_fd_t dst_fd, JL_DLLEXPORT int jl_fs_symlink(char *path, char *new_path, int flags) { uv_fs_t req; - int ret = uv_fs_symlink(jl_io_loop, &req, path, new_path, flags, NULL); + int ret = uv_fs_symlink(unused_uv_loop_arg, &req, path, new_path, flags, NULL); uv_fs_req_cleanup(&req); return ret; } @@ -360,7 +390,7 @@ JL_DLLEXPORT int jl_fs_symlink(char *path, char *new_path, int flags) JL_DLLEXPORT int jl_fs_chmod(char *path, int mode) { uv_fs_t req; - int ret = uv_fs_chmod(jl_io_loop, &req, path, mode, NULL); + int ret = uv_fs_chmod(unused_uv_loop_arg, &req, path, mode, NULL); uv_fs_req_cleanup(&req); return ret; } @@ -368,7 +398,7 @@ JL_DLLEXPORT int jl_fs_chmod(char *path, int mode) JL_DLLEXPORT int jl_fs_chown(char *path, int uid, int gid) { uv_fs_t req; - int ret = uv_fs_chown(jl_io_loop, &req, path, uid, gid, NULL); + int ret = uv_fs_chown(unused_uv_loop_arg, &req, path, uid, gid, NULL); uv_fs_req_cleanup(&req); return ret; } @@ -377,6 +407,7 @@ JL_DLLEXPORT int jl_fs_write(uv_os_fd_t handle, const char *data, size_t len, int64_t offset) { jl_ptls_t ptls = jl_get_ptls_states(); + // TODO: fix this cheating if (ptls->safe_restore || ptls->tid != 0) #ifdef _OS_WINDOWS_ return WriteFile(handle, data, len, NULL, NULL); @@ -389,7 +420,7 @@ JL_DLLEXPORT int jl_fs_write(uv_os_fd_t handle, const char *data, size_t len, buf[0].len = len; if (!jl_io_loop) jl_io_loop = uv_default_loop(); - int ret = uv_fs_write(jl_io_loop, &req, handle, buf, 1, offset, NULL); + int ret = uv_fs_write(unused_uv_loop_arg, &req, handle, buf, 1, offset, NULL); uv_fs_req_cleanup(&req); return ret; } @@ -400,7 +431,7 @@ JL_DLLEXPORT int jl_fs_read(uv_os_fd_t handle, char *data, size_t len) uv_buf_t buf[1]; buf[0].base = data; buf[0].len = len; - int ret = uv_fs_read(jl_io_loop, &req, handle, buf, 1, -1, NULL); + int ret = uv_fs_read(unused_uv_loop_arg, &req, handle, buf, 1, -1, NULL); uv_fs_req_cleanup(&req); return ret; } @@ -412,7 +443,7 @@ JL_DLLEXPORT int jl_fs_read_byte(uv_os_fd_t handle) uv_buf_t buf[1]; buf[0].base = (char*)&c; buf[0].len = 1; - int ret = uv_fs_read(jl_io_loop, &req, handle, buf, 1, -1, NULL); + int ret = uv_fs_read(unused_uv_loop_arg, &req, handle, buf, 1, -1, NULL); uv_fs_req_cleanup(&req); switch (ret) { case -1: return ret; @@ -427,7 +458,7 @@ JL_DLLEXPORT int jl_fs_read_byte(uv_os_fd_t handle) JL_DLLEXPORT int jl_fs_close(uv_os_fd_t handle) { uv_fs_t req; - int ret = uv_fs_close(jl_io_loop, &req, handle, NULL); + int ret = uv_fs_close(unused_uv_loop_arg, &req, handle, NULL); uv_fs_req_cleanup(&req); return ret; } @@ -438,8 +469,10 @@ JL_DLLEXPORT int jl_uv_write(uv_stream_t *stream, const char *data, size_t n, uv_buf_t buf[1]; buf[0].base = (char*)data; buf[0].len = n; + JL_UV_LOCK(); JL_SIGATOMIC_BEGIN(); int err = uv_write(uvw, stream, buf, 1, writecb); + JL_UV_UNLOCK(); JL_SIGATOMIC_END(); return err; } @@ -473,7 +506,7 @@ JL_DLLEXPORT void jl_uv_puts(uv_stream_t *stream, const char *str, size_t n) fd = ((jl_uv_file_t*)stream)->file; } - // Hack to make CoreIO thread-safer + // TODO: Hack to make CoreIO thread-safer jl_ptls_t ptls = jl_get_ptls_states(); if (ptls->tid != 0) { if (stream == JL_STDOUT) { @@ -504,8 +537,10 @@ JL_DLLEXPORT void jl_uv_puts(uv_stream_t *stream, const char *str, size_t n) buf[0].base = data; buf[0].len = n; req->data = NULL; + JL_UV_LOCK(); JL_SIGATOMIC_BEGIN(); int status = uv_write(req, stream, buf, 1, (uv_write_cb)jl_uv_writecb); + JL_UV_UNLOCK(); JL_SIGATOMIC_END(); if (status < 0) { jl_uv_writecb(req, status); @@ -611,6 +646,7 @@ JL_DLLEXPORT int jl_tcp_bind(uv_tcp_t *handle, uint16_t port, uint32_t host, addr.sin_port = port; addr.sin_addr.s_addr = host; addr.sin_family = AF_INET; + // TODO: do we need a lock here? return uv_tcp_bind(handle, (struct sockaddr*)&addr, flags); } @@ -622,6 +658,7 @@ JL_DLLEXPORT int jl_tcp_bind6(uv_tcp_t *handle, uint16_t port, void *host, addr.sin6_port = port; memcpy(&addr.sin6_addr, host, 16); addr.sin6_family = AF_INET6; + // TODO: do we need a lock here return uv_tcp_bind(handle, (struct sockaddr*)&addr, flags); } @@ -710,7 +747,10 @@ JL_DLLEXPORT int jl_udp_send(uv_udp_t *handle, uint16_t port, uint32_t host, buf[0].len = size; uv_udp_send_t *req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); req->data = handle->data; - return uv_udp_send(req, handle, buf, 1, (struct sockaddr*)&addr, cb); + JL_UV_LOCK(); + int r = uv_udp_send(req, handle, buf, 1, (struct sockaddr*)&addr, cb); + JL_UV_UNLOCK(); + return r; } JL_DLLEXPORT int jl_udp_send6(uv_udp_t *handle, uint16_t port, void *host, @@ -726,7 +766,10 @@ JL_DLLEXPORT int jl_udp_send6(uv_udp_t *handle, uint16_t port, void *host, buf[0].len = size; uv_udp_send_t *req = (uv_udp_send_t *) malloc(sizeof(uv_udp_send_t)); req->data = handle->data; - return uv_udp_send(req, handle, buf, 1, (struct sockaddr*)&addr, cb); + JL_UV_LOCK(); + int r = uv_udp_send(req, handle, buf, 1, (struct sockaddr*)&addr, cb); + JL_UV_UNLOCK(); + return r; } JL_DLLEXPORT int jl_uv_sizeof_interface_address(void) @@ -786,7 +829,10 @@ JL_DLLEXPORT int jl_getnameinfo6(uv_loop_t *loop, uv_getnameinfo_t *req, addr.sin6_port = port; req->data = NULL; - return uv_getnameinfo(loop, req, uvcb, (struct sockaddr*)&addr, flags); + JL_UV_LOCK(); + int r = uv_getnameinfo(loop, req, uvcb, (struct sockaddr*)&addr, flags); + JL_UV_UNLOCK(); + return r; } @@ -849,7 +895,10 @@ JL_DLLEXPORT int jl_tcp4_connect(uv_tcp_t *handle,uint32_t host, uint16_t port, addr.sin_family = AF_INET; addr.sin_addr.s_addr = host; addr.sin_port = port; - return uv_tcp_connect(req,handle,(struct sockaddr*)&addr,cb); + JL_UV_LOCK(); + int r = uv_tcp_connect(req,handle,(struct sockaddr*)&addr,cb); + JL_UV_UNLOCK(); + return r; } JL_DLLEXPORT int jl_tcp6_connect(uv_tcp_t *handle, void *host, uint16_t port, @@ -862,7 +911,10 @@ JL_DLLEXPORT int jl_tcp6_connect(uv_tcp_t *handle, void *host, uint16_t port, addr.sin6_family = AF_INET6; memcpy(&addr.sin6_addr, host, 16); addr.sin6_port = port; - return uv_tcp_connect(req,handle,(struct sockaddr*)&addr,cb); + JL_UV_LOCK(); + int r = uv_tcp_connect(req,handle,(struct sockaddr*)&addr,cb); + JL_UV_UNLOCK(); + return r; } JL_DLLEXPORT int jl_connect_raw(uv_tcp_t *handle,struct sockaddr_storage *addr, @@ -870,7 +922,10 @@ JL_DLLEXPORT int jl_connect_raw(uv_tcp_t *handle,struct sockaddr_storage *addr, { uv_connect_t *req = (uv_connect_t*)malloc(sizeof(uv_connect_t)); req->data = 0; - return uv_tcp_connect(req,handle,(struct sockaddr*)addr,cb); + JL_UV_LOCK(); + int r = uv_tcp_connect(req,handle,(struct sockaddr*)addr,cb); + JL_UV_UNLOCK(); + return r; } #ifdef _OS_LINUX_ @@ -915,12 +970,20 @@ JL_DLLEXPORT int jl_tcp_reuseport(uv_tcp_t *handle) JL_DLLEXPORT int jl_uv_unix_fd_is_watched(int fd, uv_poll_t *handle, uv_loop_t *loop) { - if (fd >= loop->nwatchers) + JL_UV_LOCK(); + if (fd >= loop->nwatchers) { + JL_UV_UNLOCK(); return 0; - if (loop->watchers[fd] == NULL) + } + if (loop->watchers[fd] == NULL) { + JL_UV_UNLOCK(); return 0; - if (handle && loop->watchers[fd] == &handle->io_watcher) + } + if (handle && loop->watchers[fd] == &handle->io_watcher) { + JL_UV_UNLOCK(); return 0; + } + JL_UV_UNLOCK(); return 1; } @@ -981,6 +1044,7 @@ JL_DLLEXPORT int jl_tty_set_mode(uv_tty_t *handle, int mode) uv_tty_mode_t mode_enum = UV_TTY_MODE_NORMAL; if (mode) mode_enum = UV_TTY_MODE_RAW; + // TODO: do we need lock? return uv_tty_set_mode(handle, mode_enum); } @@ -1025,11 +1089,108 @@ JL_DLLEXPORT int jl_queue_work(work_cb_t work_func, void *work_args, void *work_ baton->notify_func = notify_func; baton->notify_idx = notify_idx; + JL_UV_LOCK(); uv_queue_work(jl_io_loop, &baton->req, jl_work_wrapper, jl_work_notifier); + JL_UV_UNLOCK(); return 0; } +JL_DLLEXPORT void jl_uv_stop(uv_loop_t* loop) +{ + JL_UV_LOCK(); + uv_stop(loop); + // TODO: use memory/compiler fence here instead of the lock + JL_UV_UNLOCK(); +} + +JL_DLLEXPORT void jl_uv_update_time(uv_loop_t* loop) +{ + JL_UV_LOCK(); + uv_update_time(loop); + JL_UV_UNLOCK(); +} + +JL_DLLEXPORT int jl_uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, + uint64_t timeout, uint64_t repeat) +{ + JL_UV_LOCK(); + int r = uv_timer_start(handle, cb, timeout, repeat); + JL_UV_UNLOCK(); + return r; +} + +JL_DLLEXPORT int jl_uv_timer_stop(uv_timer_t* handle) +{ + JL_UV_LOCK(); + int r = uv_timer_stop(handle); + JL_UV_UNLOCK(); + return r; +} + +JL_DLLEXPORT int jl_uv_fs_scandir(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, + uv_fs_cb cb) +{ + JL_UV_LOCK(); + int r = uv_fs_scandir(loop, req, path, flags, cb); + JL_UV_UNLOCK(); + return r; +} + +JL_DLLEXPORT int jl_uv_fs_readlink(uv_loop_t* loop, uv_fs_t* req, const char* path, + uv_fs_cb cb) +{ + JL_UV_LOCK(); + int r = uv_fs_readlink(loop, req, path, cb); + JL_UV_UNLOCK(); + return r; +} + +JL_DLLEXPORT int jl_uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, + int mode, uv_fs_cb cb) +{ + JL_UV_LOCK(); + int r = uv_fs_open(loop, req, path, flags, mode, cb); + JL_UV_UNLOCK(); + return r; +} + +JL_DLLEXPORT int jl_uv_fs_ftruncate(uv_loop_t* loop, uv_fs_t* req, uv_os_fd_t handle, + int64_t offset, uv_fs_cb cb) +{ + JL_UV_LOCK(); + int r = uv_fs_ftruncate(loop, req, handle, offset, cb); + JL_UV_UNLOCK(); + return r; +} + +JL_DLLEXPORT int jl_uv_fs_futime(uv_loop_t* loop, uv_fs_t* req, uv_os_fd_t handle, double atime, + double mtime, uv_fs_cb cb) +{ + JL_UV_LOCK(); + int r = uv_fs_futime(loop, req, handle, atime, mtime, cb); + JL_UV_UNLOCK(); + return r; +} + +JL_DLLEXPORT int jl_uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) +{ + JL_UV_LOCK(); + int r = uv_read_start(handle, alloc_cb, read_cb); + JL_UV_UNLOCK(); + return r; +} + +JL_DLLEXPORT int jl_uv_read_stop(uv_stream_t* handle) +{ + JL_UV_LOCK(); + int r = uv_read_stop(handle); + JL_UV_UNLOCK(); + return r; +} + + #ifndef _OS_WINDOWS_ #if defined(__APPLE__) int uv___stream_fd(uv_stream_t *handle); diff --git a/src/julia_internal.h b/src/julia_internal.h index ad68867b46356..df627221ce6bc 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -4,6 +4,7 @@ #define JL_INTERNAL_H #include "options.h" +#include "locks.h" #include #if !defined(_MSC_VER) && !defined(__MINGW32__) #include @@ -108,6 +109,15 @@ static inline void jl_assume_(int cond) # define JL_USE_IFUNC 0 #endif +// If this is detected in a backtrace of segfault, it means the functions +// that use this value must be reworked into their async form with cb arg +// provided and with JL_UV_LOCK used around the calls +static uv_loop_t *const unused_uv_loop_arg = (uv_loop_t *)0xBAD10; + +extern jl_mutex_t jl_uv_mutex; +#define JL_UV_LOCK() JL_LOCK_NOGC(&jl_uv_mutex) +#define JL_UV_UNLOCK() JL_UNLOCK_NOGC(&jl_uv_mutex) + #ifdef __cplusplus extern "C" { #endif @@ -490,7 +500,7 @@ void jl_init_stack_limits(int ismaster, void **stack_hi, void **stack_lo); void jl_init_root_task(void *stack_lo, void *stack_hi); void jl_init_serializer(void); void jl_gc_init(void); -void jl_init_signal_async(void); +void jl_init_uv(void); void jl_init_debuginfo(void); void jl_init_thread_heap(jl_ptls_t ptls); @@ -850,6 +860,8 @@ int jl_array_store_unboxed(jl_value_t *el_type); JL_DLLEXPORT jl_value_t *(jl_array_data_owner)(jl_array_t *a); JL_DLLEXPORT int jl_array_isassigned(jl_array_t *a, size_t i); +JL_DLLEXPORT void jl_uv_stop(uv_loop_t* loop); + // -- synchronization utilities -- // extern jl_mutex_t typecache_lock; diff --git a/src/julia_threads.h b/src/julia_threads.h index bb5710d9ed4ca..1da831bafe4de 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -4,6 +4,7 @@ #ifndef JL_THREADS_H #define JL_THREADS_H +#include // threading ------------------------------------------------------------------ // WARNING: Threading support is incomplete and experimental diff --git a/src/sys.c b/src/sys.c index 6e4202edce464..ecb973e0c288f 100644 --- a/src/sys.c +++ b/src/sys.c @@ -133,7 +133,7 @@ JL_DLLEXPORT int32_t jl_stat(const char *path, char *statbuf) // Ideally one would use the statbuf for the storage in req, but // it's not clear that this is possible using libuv - ret = uv_fs_stat(uv_default_loop(), &req, path, NULL); + ret = uv_fs_stat(unused_uv_loop_arg, &req, path, NULL); if (ret == 0) memcpy(statbuf, req.ptr, sizeof(uv_stat_t)); uv_fs_req_cleanup(&req); @@ -145,7 +145,7 @@ JL_DLLEXPORT int32_t jl_lstat(const char *path, char *statbuf) uv_fs_t req; int ret; - ret = uv_fs_lstat(uv_default_loop(), &req, path, NULL); + ret = uv_fs_lstat(unused_uv_loop_arg, &req, path, NULL); if (ret == 0) memcpy(statbuf, req.ptr, sizeof(uv_stat_t)); uv_fs_req_cleanup(&req); @@ -157,7 +157,7 @@ JL_DLLEXPORT int32_t jl_fstat(uv_os_fd_t fd, char *statbuf) uv_fs_t req; int ret; - ret = uv_fs_fstat(uv_default_loop(), &req, fd, NULL); + ret = uv_fs_fstat(unused_uv_loop_arg, &req, fd, NULL); if (ret == 0) memcpy(statbuf, req.ptr, sizeof(uv_stat_t)); uv_fs_req_cleanup(&req);