Skip to content

Commit

Permalink
threads: fix scheduler variable confusion (JuliaLang#32551)
Browse files Browse the repository at this point in the history
gotta keep system vs runtime and global vs local straight!

fix JuliaLang#32511
  • Loading branch information
vtjnash authored and JeffBezanson committed Jul 16, 2019
1 parent ba03920 commit 5366148
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 41 deletions.
8 changes: 3 additions & 5 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,12 @@ struct _jl_tls_states_t {
// this is limited to the few places we do synchronous IO
// we can make this more general (similar to defer_signal) if necessary
volatile sig_atomic_t io_wait;
#ifndef _OS_WINDOWS_
// These are only used on unix now
pthread_t system_id;
void *signal_stack;
#endif
#ifdef _OS_WINDOWS_
int needs_resetstkoflw;
#else
void *signal_stack;
#endif
unsigned long system_id;
// execution of certain certain impure
// statements is prohibited from certain
// callbacks (such as generated functions)
Expand Down
55 changes: 28 additions & 27 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,14 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
}


static void wake_thread(int16_t self, int16_t tid)
static void wake_thread(int16_t tid)
{
if (self != tid) {
jl_ptls_t other = jl_all_tls_states[tid];
int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping);
if (state == sleeping) {
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
}
jl_ptls_t other = jl_all_tls_states[tid];
int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping);
if (state == sleeping) {
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
}
}

Expand All @@ -345,34 +343,37 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
{
jl_ptls_t ptls = jl_get_ptls_states();
int16_t self = ptls->tid;
unsigned long system_self = jl_all_tls_states[self]->system_id;
int16_t uvlock = jl_atomic_load_acquire(&jl_uv_mutex.owner);
if (tid == self) {
if (tid == self || tid == -1) {
// we're already awake, but make sure we'll exit uv_run
jl_atomic_store(&ptls->sleep_check_state, not_sleeping);
if (uvlock == self)
if (uvlock == system_self)
uv_stop(jl_global_event_loop());
}
#ifdef JULIA_ENABLE_THREADING
else {
// something added to the sticky-queue: notify that thread
wake_thread(tid);
// check if we need to notify uv_run too
if (uvlock != system_self)
jl_wake_libuv();
}
if (tid == -1) {
// check if the other threads might be sleeping
if (jl_atomic_load_acquire(&sleep_check_state) != not_sleeping) {
if (tid == -1) {
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping);
if (state == sleeping) {
for (tid = 0; tid < jl_n_threads; tid++)
wake_thread(self, tid);
}
}
else {
// something added to the sticky-queue: notify that thread
wake_thread(self, tid);
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping);
if (state == sleeping) {
for (tid = 0; tid < jl_n_threads; tid++)
if (tid != self)
wake_thread(tid);
// check if we need to notify uv_run too
if (uvlock != system_self)
jl_wake_libuv();
}
// check if we need to notify uv_run too
if (uvlock != self)
jl_wake_libuv();
}
}
#endif
Expand Down
14 changes: 7 additions & 7 deletions src/signals-mach.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void jl_mach_gc_end(void)
int8_t gc_state = (int8_t)(item >> 8);
jl_ptls_t ptls2 = jl_all_tls_states[tid];
jl_atomic_store_release(&ptls2->gc_state, gc_state);
thread_resume(pthread_mach_thread_np(ptls2->system_id));
thread_resume(pthread_mach_thread_np((pthread_t)ptls2->system_id));
}
suspended_threads.len = 0;
}
Expand Down Expand Up @@ -101,7 +101,7 @@ static void allocate_segv_handler()
}
pthread_attr_destroy(&attr);
for (int16_t tid = 0;tid < jl_n_threads;tid++) {
attach_exception_port(pthread_mach_thread_np(jl_all_tls_states[tid]->system_id), 0);
attach_exception_port(pthread_mach_thread_np((pthread_t)jl_all_tls_states[tid]->system_id), 0);
}
}

Expand Down Expand Up @@ -178,7 +178,7 @@ kern_return_t catch_exception_raise(mach_port_t exception_port,
jl_ptls_t ptls2 = NULL;
for (tid = 0;tid < jl_n_threads;tid++) {
jl_ptls_t _ptls2 = jl_all_tls_states[tid];
if (pthread_mach_thread_np(_ptls2->system_id) == thread) {
if (pthread_mach_thread_np((pthread_t)_ptls2->system_id) == thread) {
ptls2 = _ptls2;
break;
}
Expand Down Expand Up @@ -269,7 +269,7 @@ static void attach_exception_port(thread_port_t thread, int segv_only)
static void jl_thread_suspend_and_get_state(int tid, unw_context_t **ctx)
{
jl_ptls_t ptls2 = jl_all_tls_states[tid];
mach_port_t tid_port = pthread_mach_thread_np(ptls2->system_id);
mach_port_t tid_port = pthread_mach_thread_np((pthread_t)ptls2->system_id);

kern_return_t ret = thread_suspend(tid_port);
HANDLE_MACH_ERROR("thread_suspend", ret);
Expand All @@ -289,7 +289,7 @@ static void jl_thread_suspend_and_get_state(int tid, unw_context_t **ctx)
static void jl_thread_resume(int tid, int sig)
{
jl_ptls_t ptls2 = jl_all_tls_states[tid];
mach_port_t thread = pthread_mach_thread_np(ptls2->system_id);
mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id);
kern_return_t ret = thread_resume(thread);
HANDLE_MACH_ERROR("thread_resume", ret);
}
Expand All @@ -299,7 +299,7 @@ static void jl_thread_resume(int tid, int sig)
static void jl_try_deliver_sigint(void)
{
jl_ptls_t ptls2 = jl_all_tls_states[0];
mach_port_t thread = pthread_mach_thread_np(ptls2->system_id);
mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id);

kern_return_t ret = thread_suspend(thread);
HANDLE_MACH_ERROR("thread_suspend", ret);
Expand Down Expand Up @@ -328,7 +328,7 @@ static void jl_try_deliver_sigint(void)
static void jl_exit_thread0(int exitstate)
{
jl_ptls_t ptls2 = jl_all_tls_states[0];
mach_port_t thread = pthread_mach_thread_np(ptls2->system_id);
mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id);
kern_return_t ret = thread_suspend(thread);
HANDLE_MACH_ERROR("thread_suspend", ret);

Expand Down
3 changes: 1 addition & 2 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ JL_DLLEXPORT int16_t jl_threadid(void)
void jl_init_threadtls(int16_t tid)
{
jl_ptls_t ptls = jl_get_ptls_states();
ptls->system_id = jl_thread_self();
seed_cong(&ptls->rngseed);
#ifdef _OS_WINDOWS_
if (tid == 0) {
Expand All @@ -251,8 +252,6 @@ void jl_init_threadtls(int16_t tid)
hMainThread = INVALID_HANDLE_VALUE;
}
}
#else
ptls->system_id = pthread_self();
#endif
assert(ptls->world_age == 0);
ptls->world_age = 1; // OK to run Julia code on this thread
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ end
# Base.compilecache only works from node 1, so precompile test is handled specially
move_to_node1("precompile")
move_to_node1("SharedArrays")
move_to_node1("threads")
# Ensure things like consuming all kernel pipe memory doesn't interfere with other tests
move_to_node1("stress")

Expand Down
12 changes: 12 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -653,3 +653,15 @@ function pfib(n::Int)
return pfib(n-1) + fetch(t)::Int
end
@test pfib(20) == 6765


# scheduling wake/sleep test (#32511)
let timeout = 300 # this test should take about 1-10 seconds
t = Timer(timeout) do t
ccall(:uv_kill, Cint, (Cint, Cint), getpid(), Base.SIGTERM)
end # set up a watchdog alarm
for _ = 1:10^5
@threads for idx in 1:1024; #=nothing=# end
end
close(t) # stop the watchdog
end

0 comments on commit 5366148

Please sign in to comment.