diff --git a/src/julia_threads.h b/src/julia_threads.h index 8996d02a55aae..6dd2335e71b64 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -190,14 +190,12 @@ struct _jl_tls_states_t { // this is limited to the few places we do synchronous IO // we can make this more general (similar to defer_signal) if necessary volatile sig_atomic_t io_wait; -#ifndef _OS_WINDOWS_ - // These are only used on unix now - pthread_t system_id; - void *signal_stack; -#endif #ifdef _OS_WINDOWS_ int needs_resetstkoflw; +#else + void *signal_stack; #endif + unsigned long system_id; // execution of certain certain impure // statements is prohibited from certain // callbacks (such as generated functions) diff --git a/src/partr.c b/src/partr.c index 913a3d9816be2..6a7a6cf51ee15 100644 --- a/src/partr.c +++ b/src/partr.c @@ -311,16 +311,14 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) } -static void wake_thread(int16_t self, int16_t tid) +static void wake_thread(int16_t tid) { - if (self != tid) { - jl_ptls_t other = jl_all_tls_states[tid]; - int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping); - if (state == sleeping) { - uv_mutex_lock(&other->sleep_lock); - uv_cond_signal(&other->wake_signal); - uv_mutex_unlock(&other->sleep_lock); - } + jl_ptls_t other = jl_all_tls_states[tid]; + int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping); + if (state == sleeping) { + uv_mutex_lock(&other->sleep_lock); + uv_cond_signal(&other->wake_signal); + uv_mutex_unlock(&other->sleep_lock); } } @@ -345,34 +343,37 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) { jl_ptls_t ptls = jl_get_ptls_states(); int16_t self = ptls->tid; + unsigned long system_self = jl_all_tls_states[self]->system_id; int16_t uvlock = jl_atomic_load_acquire(&jl_uv_mutex.owner); - if (tid == self) { + if (tid == self || tid == -1) { // we're already awake, but make sure we'll exit uv_run jl_atomic_store(&ptls->sleep_check_state, not_sleeping); - if (uvlock == self) + if (uvlock == system_self) uv_stop(jl_global_event_loop()); } #ifdef JULIA_ENABLE_THREADING else { + // something added to the sticky-queue: notify that thread + wake_thread(tid); + // check if we need to notify uv_run too + if (uvlock != system_self) + jl_wake_libuv(); + } + if (tid == -1) { // check if the other threads might be sleeping if (jl_atomic_load_acquire(&sleep_check_state) != not_sleeping) { - if (tid == -1) { - // something added to the multi-queue: notify all threads - // in the future, we might want to instead wake some fraction of threads, - // and let each of those wake additional threads if they find work - int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping); - if (state == sleeping) { - for (tid = 0; tid < jl_n_threads; tid++) - wake_thread(self, tid); - } - } - else { - // something added to the sticky-queue: notify that thread - wake_thread(self, tid); + // something added to the multi-queue: notify all threads + // in the future, we might want to instead wake some fraction of threads, + // and let each of those wake additional threads if they find work + int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping); + if (state == sleeping) { + for (tid = 0; tid < jl_n_threads; tid++) + if (tid != self) + wake_thread(tid); + // check if we need to notify uv_run too + if (uvlock != system_self) + jl_wake_libuv(); } - // check if we need to notify uv_run too - if (uvlock != self) - jl_wake_libuv(); } } #endif diff --git a/src/signals-mach.c b/src/signals-mach.c index c1add709a3e6b..0e2fa03f3ad5e 100644 --- a/src/signals-mach.c +++ b/src/signals-mach.c @@ -31,7 +31,7 @@ void jl_mach_gc_end(void) int8_t gc_state = (int8_t)(item >> 8); jl_ptls_t ptls2 = jl_all_tls_states[tid]; jl_atomic_store_release(&ptls2->gc_state, gc_state); - thread_resume(pthread_mach_thread_np(ptls2->system_id)); + thread_resume(pthread_mach_thread_np((pthread_t)ptls2->system_id)); } suspended_threads.len = 0; } @@ -101,7 +101,7 @@ static void allocate_segv_handler() } pthread_attr_destroy(&attr); for (int16_t tid = 0;tid < jl_n_threads;tid++) { - attach_exception_port(pthread_mach_thread_np(jl_all_tls_states[tid]->system_id), 0); + attach_exception_port(pthread_mach_thread_np((pthread_t)jl_all_tls_states[tid]->system_id), 0); } } @@ -178,7 +178,7 @@ kern_return_t catch_exception_raise(mach_port_t exception_port, jl_ptls_t ptls2 = NULL; for (tid = 0;tid < jl_n_threads;tid++) { jl_ptls_t _ptls2 = jl_all_tls_states[tid]; - if (pthread_mach_thread_np(_ptls2->system_id) == thread) { + if (pthread_mach_thread_np((pthread_t)_ptls2->system_id) == thread) { ptls2 = _ptls2; break; } @@ -269,7 +269,7 @@ static void attach_exception_port(thread_port_t thread, int segv_only) static void jl_thread_suspend_and_get_state(int tid, unw_context_t **ctx) { jl_ptls_t ptls2 = jl_all_tls_states[tid]; - mach_port_t tid_port = pthread_mach_thread_np(ptls2->system_id); + mach_port_t tid_port = pthread_mach_thread_np((pthread_t)ptls2->system_id); kern_return_t ret = thread_suspend(tid_port); HANDLE_MACH_ERROR("thread_suspend", ret); @@ -289,7 +289,7 @@ static void jl_thread_suspend_and_get_state(int tid, unw_context_t **ctx) static void jl_thread_resume(int tid, int sig) { jl_ptls_t ptls2 = jl_all_tls_states[tid]; - mach_port_t thread = pthread_mach_thread_np(ptls2->system_id); + mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id); kern_return_t ret = thread_resume(thread); HANDLE_MACH_ERROR("thread_resume", ret); } @@ -299,7 +299,7 @@ static void jl_thread_resume(int tid, int sig) static void jl_try_deliver_sigint(void) { jl_ptls_t ptls2 = jl_all_tls_states[0]; - mach_port_t thread = pthread_mach_thread_np(ptls2->system_id); + mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id); kern_return_t ret = thread_suspend(thread); HANDLE_MACH_ERROR("thread_suspend", ret); @@ -328,7 +328,7 @@ static void jl_try_deliver_sigint(void) static void jl_exit_thread0(int exitstate) { jl_ptls_t ptls2 = jl_all_tls_states[0]; - mach_port_t thread = pthread_mach_thread_np(ptls2->system_id); + mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id); kern_return_t ret = thread_suspend(thread); HANDLE_MACH_ERROR("thread_suspend", ret); diff --git a/src/threading.c b/src/threading.c index caa477cdbfd12..88c5c5482742e 100644 --- a/src/threading.c +++ b/src/threading.c @@ -241,6 +241,7 @@ JL_DLLEXPORT int16_t jl_threadid(void) void jl_init_threadtls(int16_t tid) { jl_ptls_t ptls = jl_get_ptls_states(); + ptls->system_id = jl_thread_self(); seed_cong(&ptls->rngseed); #ifdef _OS_WINDOWS_ if (tid == 0) { @@ -251,8 +252,6 @@ void jl_init_threadtls(int16_t tid) hMainThread = INVALID_HANDLE_VALUE; } } -#else - ptls->system_id = pthread_self(); #endif assert(ptls->world_age == 0); ptls->world_age = 1; // OK to run Julia code on this thread diff --git a/test/runtests.jl b/test/runtests.jl index aa961ad7cb2f7..10dfdb6858e3a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -50,6 +50,7 @@ end # Base.compilecache only works from node 1, so precompile test is handled specially move_to_node1("precompile") move_to_node1("SharedArrays") +move_to_node1("threads") # Ensure things like consuming all kernel pipe memory doesn't interfere with other tests move_to_node1("stress") diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 172cee215c511..a2e20dee695cf 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -653,3 +653,15 @@ function pfib(n::Int) return pfib(n-1) + fetch(t)::Int end @test pfib(20) == 6765 + + +# scheduling wake/sleep test (#32511) +let timeout = 300 # this test should take about 1-10 seconds + t = Timer(timeout) do t + ccall(:uv_kill, Cint, (Cint, Cint), getpid(), Base.SIGTERM) + end # set up a watchdog alarm + for _ = 1:10^5 + @threads for idx in 1:1024; #=nothing=# end + end + close(t) # stop the watchdog +end