diff --git a/base/task.jl b/base/task.jl index 7242350caccbc..d453157ea3f0c 100644 --- a/base/task.jl +++ b/base/task.jl @@ -444,12 +444,17 @@ end function enq_work(t::Task) (t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable") - tid = (t.sticky ? Threads.threadid(t) : 0) - if tid == 0 - tid = Threads.threadid() + if t.sticky + tid = Threads.threadid(t) + if tid == 0 + tid = Threads.threadid() + end + push!(Workqueues[tid], t) + else + tid = 0 + ccall(:jl_enqueue_task, Cvoid, (Any,), t) end - push!(Workqueues[tid], t) - tid == 1 && ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop()) + ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) return t end @@ -603,30 +608,32 @@ function trypoptask(W::StickyWorkqueue) end @noinline function poptaskref(W::StickyWorkqueue) - local task - while true - task = trypoptask(W) - task === nothing || break - if !Threads.in_threaded_loop[] && Threads.threadid() == 1 - if process_events(true) == 0 - task = trypoptask(W) - task === nothing || break - # if there are no active handles and no runnable tasks, just - # wait for signals. - pause() - end - else - if Threads.threadid() == 1 - process_events(false) - end - ccall(:jl_gc_safepoint, Cvoid, ()) - ccall(:jl_cpu_pause, Cvoid, ()) - end - end + gettask = () -> trypoptask(W) + task = ccall(:jl_task_get_next, Any, (Any,), gettask) + ## Below is a reference implementation for `jl_task_get_next`, which currently lives in C + #local task + #while true + # task = trypoptask(W) + # task === nothing || break + # if !Threads.in_threaded_loop[] && Threads.threadid() == 1 + # if process_events(true) == 0 + # task = trypoptask(W) + # task === nothing || break + # # if there are no active handles and no runnable tasks, just + # # wait for signals. + # pause() + # end + # else + # if Threads.threadid() == 1 + # process_events(false) + # end + # ccall(:jl_gc_safepoint, Cvoid, ()) + # ccall(:jl_cpu_pause, Cvoid, ()) + # end + #end return Ref(task) end - function wait() W = Workqueues[Threads.threadid()] reftask = poptaskref(W) diff --git a/src/Makefile b/src/Makefile index ceaaa6efe8231..ee3c6bbd1c2eb 100644 --- a/src/Makefile +++ b/src/Makefile @@ -43,7 +43,7 @@ SRCS := \ jltypes gf typemap ast builtins module interpreter symbol \ dlload sys init task array dump staticdata toplevel jl_uv datatype \ simplevector APInt-C runtime_intrinsics runtime_ccall precompile \ - threadgroup threading stackwalk gc gc-debug gc-pages gc-stacks method \ + threading partr stackwalk gc gc-debug gc-pages gc-stacks method \ jlapi signal-handling safepoint jloptions timing subtype rtutils \ crc32c processor @@ -215,7 +215,7 @@ $(BUILDDIR)/gc-debug.o $(BUILDDIR)/gc-debug.dbg.obj: $(SRCDIR)/gc.h $(BUILDDIR)/gc-pages.o $(BUILDDIR)/gc-pages.dbg.obj: $(SRCDIR)/gc.h $(BUILDDIR)/signal-handling.o $(BUILDDIR)/signal-handling.dbg.obj: $(addprefix $(SRCDIR)/,signals-*.c) $(BUILDDIR)/dump.o $(BUILDDIR)/dump.dbg.obj: $(addprefix $(SRCDIR)/,common_symbols1.inc common_symbols2.inc) -$(addprefix $(BUILDDIR)/,threading.o threading.dbg.obj gc.o gc.dbg.obj init.c init.dbg.obj task.o task.dbg.obj): $(addprefix $(SRCDIR)/,threading.h threadgroup.h) +$(addprefix $(BUILDDIR)/,threading.o threading.dbg.obj gc.o gc.dbg.obj init.c init.dbg.obj task.o task.dbg.obj): $(addprefix $(SRCDIR)/,threading.h) $(addprefix $(BUILDDIR)/,APInt-C.o APInt-C.dbg.obj runtime_intrinsics.o runtime_intrinsics.dbg.obj): $(SRCDIR)/APInt-C.h # archive library file rules diff --git a/src/atomics.h b/src/atomics.h index 493f0297892bc..ebfc66bbd83f4 100644 --- a/src/atomics.h +++ b/src/atomics.h @@ -62,8 +62,12 @@ // the __atomic builtins or c11 atomics with GNU extension or c11 _Generic # define jl_atomic_compare_exchange(obj, expected, desired) \ __sync_val_compare_and_swap(obj, expected, desired) +# define jl_atomic_bool_compare_exchange(obj, expected, desired) \ + __sync_bool_compare_and_swap(obj, expected, desired) # define jl_atomic_exchange(obj, desired) \ __atomic_exchange_n(obj, desired, __ATOMIC_SEQ_CST) +# define jl_atomic_exchange_generic(obj, desired, orig)\ + __atomic_exchange(obj, desired, orig, __ATOMIC_SEQ_CST) # define jl_atomic_exchange_relaxed(obj, desired) \ __atomic_exchange_n(obj, desired, __ATOMIC_RELAXED) // TODO: Maybe add jl_atomic_compare_exchange_weak for spin lock @@ -115,6 +119,7 @@ jl_atomic_fetch_add(T *obj, T2 arg) { return (T)_InterlockedExchangeAdd64((volatile __int64*)obj, (__int64)arg); } +// TODO: jl_atomic_exchange_generic #define jl_atomic_fetch_add_relaxed(obj, arg) jl_atomic_fetch_add(obj, arg) // and @@ -200,6 +205,7 @@ jl_atomic_compare_exchange(volatile T *obj, T2 expected, T3 desired) return (T)_InterlockedCompareExchange64((volatile __int64*)obj, (__int64)desired, (__int64)expected); } +// TODO: jl_atomic_bool_compare_exchange // atomic exchange template static inline typename std::enable_if::type diff --git a/src/gc.c b/src/gc.c index 4d7b655d50534..09c348a09b905 100644 --- a/src/gc.c +++ b/src/gc.c @@ -1634,6 +1634,11 @@ STATIC_INLINE int gc_mark_queue_obj(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_ return (int)nptr; } +int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp, jl_value_t *obj) +{ + return gc_mark_queue_obj(gc_cache, sp, obj); +} + JL_DLLEXPORT int jl_gc_mark_queue_obj(jl_ptls_t ptls, jl_value_t *obj) { return gc_mark_queue_obj(&ptls->gc_cache, &ptls->gc_mark_sp, obj); @@ -2483,12 +2488,17 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception); } +void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp); + // mark the initial root set static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) { // modules gc_mark_queue_obj(gc_cache, sp, jl_main_module); + // tasks + jl_gc_mark_enqueued_tasks(gc_cache, sp); + // invisible builtin values if (jl_an_empty_vec_any != NULL) gc_mark_queue_obj(gc_cache, sp, jl_an_empty_vec_any); diff --git a/src/init.c b/src/init.c index 65094a059d190..dd7924823efae 100644 --- a/src/init.c +++ b/src/init.c @@ -825,6 +825,10 @@ void _julia_init(JL_IMAGE_SEARCH rel) ptls->world_age = last_age; } } + else { + // nthreads > 1 requires code in Base + jl_n_threads = 1; + } jl_start_threads(); // This needs to be after jl_start_threads diff --git a/src/julia.h b/src/julia.h index 2a5f092b5d681..e404c4787c1c1 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1662,6 +1662,8 @@ typedef struct _jl_task_t { // id of owning thread // does not need to be defined until the task runs int16_t tid; + /* for the multiqueue */ + int16_t prio; #ifdef JULIA_ENABLE_THREADING // This is statically initialized when the task is not holding any locks arraylist_t locks; diff --git a/src/julia_internal.h b/src/julia_internal.h index a185e60adf993..ad68867b46356 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -502,7 +502,6 @@ extern ssize_t jl_tls_offset; extern const int jl_tls_elf_support; void jl_init_threading(void); void jl_start_threads(void); -void jl_shutdown_threading(void); // Whether the GC is running extern char *jl_safepoint_pages; @@ -708,6 +707,24 @@ void jl_copy_excstack(jl_excstack_t *dest, jl_excstack_t *src) JL_NOTSAFEPOINT; // Returns time in nanosec JL_DLLEXPORT uint64_t jl_hrtime(void); +// congruential random number generator +// for a small amount of thread-local randomness +// we could just use libc:`rand()`, but we want to ensure this is fast +STATIC_INLINE void seed_cong(uint64_t *seed) +{ + *seed = rand(); +} +STATIC_INLINE void unbias_cong(uint64_t max, uint64_t *unbias) +{ + *unbias = UINT64_MAX - ((UINT64_MAX % max) + 1); +} +STATIC_INLINE uint64_t cong(uint64_t max, uint64_t unbias, uint64_t *seed) +{ + while ((*seed = 69069 * (*seed) + 362437) > unbias) + ; + return *seed % max; +} + // libuv stuff: JL_DLLEXPORT extern void *jl_dl_handle; JL_DLLEXPORT extern void *jl_RTLD_DEFAULT_handle; diff --git a/src/julia_threads.h b/src/julia_threads.h index a768ffea378fa..bb5710d9ed4ca 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -140,6 +140,8 @@ typedef struct _jl_excstack_t jl_excstack_t; struct _jl_tls_states_t { struct _jl_gcframe_t *pgcstack; size_t world_age; + int16_t tid; + uint64_t rngseed; volatile size_t *safepoint; // Whether it is safe to execute GC at the same time. #define JL_GC_STATE_WAITING 1 @@ -158,7 +160,6 @@ struct _jl_tls_states_t { size_t stacksize; jl_ucontext_t base_ctx; // base context of stack jl_jmp_buf *safe_restore; - int16_t tid; // Temp storage for exception thrown in signal handler. Not rooted. struct _jl_value_t *sig_exception; // Temporary backtrace buffer. Scanned for gc roots when bt_size > 0. diff --git a/src/locks.h b/src/locks.h index b030e8c20403f..bb53887164723 100644 --- a/src/locks.h +++ b/src/locks.h @@ -105,6 +105,22 @@ static inline void jl_mutex_lock(jl_mutex_t *lock) jl_gc_enable_finalizers(ptls, 0); } +static inline int jl_mutex_trylock_nogc(jl_mutex_t *lock) +{ + unsigned long self = jl_thread_self(); + unsigned long owner = jl_atomic_load_acquire(&lock->owner); + if (owner == self) { + lock->count++; + return 1; + } + if (owner == 0 && + jl_atomic_compare_exchange(&lock->owner, 0, self) == 0) { + lock->count = 1; + return 1; + } + return 0; +} + /* Call this function for code that could be called from either a managed or an unmanaged thread */ static inline void jl_mutex_lock_maybe_nogc(jl_mutex_t *lock) diff --git a/src/options.h b/src/options.h index 5a4fc70a1f102..37b1a38401009 100644 --- a/src/options.h +++ b/src/options.h @@ -129,6 +129,25 @@ #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 +// partr -- parallel tasks runtime options ------------------------------------ + +// multiq + // number of heaps = MULTIQ_HEAP_C * nthreads +#define MULTIQ_HEAP_C 4 + // how many in each heap +#define MULTIQ_TASKS_PER_HEAP 129 + +// parfor + // tasks = niters / (GRAIN_K * nthreads) +#define GRAIN_K 4 + +// synchronization + // narrivers = ((GRAIN_K * nthreads) ^ ARRIVERS_P) + 1 + // limit for number of recursive parfors +#define ARRIVERS_P 2 + // nreducers = narrivers * REDUCERS_FRAC +#define REDUCERS_FRAC 1 + // sanitizer defaults --------------------------------------------------------- diff --git a/src/partr.c b/src/partr.c new file mode 100644 index 0000000000000..6520c936da8f4 --- /dev/null +++ b/src/partr.c @@ -0,0 +1,316 @@ +// This file is a part of Julia. License is MIT: https://julialang.org/license + +#include +#include +#include +#include + +#include "julia.h" +#include "julia_internal.h" +#include "gc.h" +#include "threading.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define JULIA_ENABLE_PARTR + +#ifdef JULIA_ENABLE_THREADING + +// GC functions used +extern int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, + jl_gc_mark_sp_t *sp, jl_value_t *obj); + +// multiq +// --- + +/* a task heap */ +typedef struct taskheap_tag { + jl_mutex_t lock; + jl_task_t **tasks; + int16_t ntasks, prio; +} taskheap_t; + +/* multiqueue parameters */ +static const int16_t heap_d = 8; +static const int heap_c = 4; + +/* size of each heap */ +static const int tasks_per_heap = 8192; // TODO: this should be smaller by default, but growable! + +/* the multiqueue's heaps */ +static taskheap_t *heaps; +static int16_t heap_p; + +/* unbias state for the RNG */ +static uint64_t cong_unbias; + +/* for thread sleeping */ +uv_mutex_t sleep_lock; +uv_cond_t sleep_alarm; + + +/* multiq_init() + */ +static inline void multiq_init(void) +{ + heap_p = heap_c * jl_n_threads; + heaps = (taskheap_t *)calloc(heap_p, sizeof(taskheap_t)); + for (int16_t i = 0; i < heap_p; ++i) { + jl_mutex_init(&heaps[i].lock); + heaps[i].tasks = (jl_task_t **)calloc(tasks_per_heap, sizeof(jl_task_t*)); + heaps[i].ntasks = 0; + heaps[i].prio = INT16_MAX; + } + unbias_cong(heap_p, &cong_unbias); +} + + +/* sift_up() + */ +static inline void sift_up(taskheap_t *heap, int16_t idx) +{ + if (idx > 0) { + int16_t parent = (idx-1)/heap_d; + if (heap->tasks[idx]->prio < heap->tasks[parent]->prio) { + jl_task_t *t = heap->tasks[parent]; + heap->tasks[parent] = heap->tasks[idx]; + heap->tasks[idx] = t; + sift_up(heap, parent); + } + } +} + + +/* sift_down() + */ +static inline void sift_down(taskheap_t *heap, int16_t idx) +{ + if (idx < heap->ntasks) { + for (int16_t child = heap_d*idx + 1; + child < tasks_per_heap && child <= heap_d*idx + heap_d; + ++child) { + if (heap->tasks[child] + && heap->tasks[child]->prio < heap->tasks[idx]->prio) { + jl_task_t *t = heap->tasks[idx]; + heap->tasks[idx] = heap->tasks[child]; + heap->tasks[child] = t; + sift_down(heap, child); + } + } + } +} + + +/* multiq_insert() + */ +static inline int multiq_insert(jl_task_t *task, int16_t priority) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + uint64_t rn; + + task->prio = priority; + do { + rn = cong(heap_p, cong_unbias, &ptls->rngseed); + } while (!jl_mutex_trylock_nogc(&heaps[rn].lock)); + + if (heaps[rn].ntasks >= tasks_per_heap) { + jl_mutex_unlock_nogc(&heaps[rn].lock); + jl_error("multiq insertion failed, increase #tasks per heap"); + return -1; + } + + heaps[rn].tasks[heaps[rn].ntasks++] = task; + sift_up(&heaps[rn], heaps[rn].ntasks-1); + jl_mutex_unlock_nogc(&heaps[rn].lock); + int16_t prio = jl_atomic_load(&heaps[rn].prio); + if (task->prio < prio) + jl_atomic_compare_exchange(&heaps[rn].prio, prio, task->prio); + + return 0; +} + + +/* multiq_deletemin() + */ +static inline jl_task_t *multiq_deletemin(void) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + uint64_t rn1 = 0, rn2; + int16_t i, prio1, prio2; + jl_task_t *task; + + for (i = 0; i < heap_p; ++i) { + rn1 = cong(heap_p, cong_unbias, &ptls->rngseed); + rn2 = cong(heap_p, cong_unbias, &ptls->rngseed); + prio1 = jl_atomic_load(&heaps[rn1].prio); + prio2 = jl_atomic_load(&heaps[rn2].prio); + if (prio1 > prio2) { + prio1 = prio2; + rn1 = rn2; + } + else if (prio1 == prio2 && prio1 == INT16_MAX) + continue; + if (jl_mutex_trylock_nogc(&heaps[rn1].lock)) { + if (prio1 == heaps[rn1].prio) + break; + jl_mutex_unlock_nogc(&heaps[rn1].lock); + } + } + if (i == heap_p) + return NULL; + + task = heaps[rn1].tasks[0]; + heaps[rn1].tasks[0] = heaps[rn1].tasks[--heaps[rn1].ntasks]; + heaps[rn1].tasks[heaps[rn1].ntasks] = NULL; + prio1 = INT16_MAX; + if (heaps[rn1].ntasks > 0) { + sift_down(&heaps[rn1], 0); + prio1 = heaps[rn1].tasks[0]->prio; + } + jl_atomic_store(&heaps[rn1].prio, prio1); + jl_mutex_unlock_nogc(&heaps[rn1].lock); + + return task; +} + + + +// parallel task runtime +// --- + +// initialize the threading infrastructure +void jl_init_threadinginfra(void) +{ + /* initialize the synchronization trees pool and the multiqueue */ + multiq_init(); + + /* initialize the sleep mechanism */ + uv_mutex_init(&sleep_lock); + uv_cond_init(&sleep_alarm); +} + + +void JL_NORETURN jl_finish_task(jl_task_t *t, jl_value_t *resultval JL_MAYBE_UNROOTED); + +// thread function: used by all except the main thread +void jl_threadfun(void *arg) +{ + jl_threadarg_t *targ = (jl_threadarg_t*)arg; + + // initialize this thread (set tid, create heap, set up root task) + jl_init_threadtls(targ->tid); + void *stack_lo, *stack_hi; + jl_init_stack_limits(0, &stack_lo, &stack_hi); + jl_init_root_task(stack_lo, stack_hi); + + jl_ptls_t ptls = jl_get_ptls_states(); + jl_gc_state_set(ptls, JL_GC_STATE_SAFE, 0); + uv_barrier_wait(targ->barrier); + + // free the thread argument here + free(targ); + + (void)jl_gc_unsafe_enter(ptls); + jl_current_task->exception = jl_nothing; + jl_finish_task(jl_current_task, jl_nothing); // noreturn +} + +JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + /* ensure thread tid is awake if necessary */ + if (ptls->tid != tid && !_threadedregion && tid != -1) { + uv_mutex_lock(&sleep_lock); + uv_cond_broadcast(&sleep_alarm); // TODO: make this uv_cond_signal / just wake up correct thread + uv_mutex_unlock(&sleep_lock); + } + /* stop the event loop too, if on thread 1 and alerting thread 1 */ + if (ptls->tid == 0 && (tid == 0 || tid == -1)) + uv_stop(jl_global_event_loop()); +} + + +// enqueue the specified task for execution +JL_DLLEXPORT void jl_enqueue_task(jl_task_t *task) +{ + multiq_insert(task, task->prio); +} + + +// get the next runnable task from the multiq +static jl_task_t *get_next_task(jl_value_t *getsticky) +{ + jl_task_t *task = (jl_task_t*)jl_apply(&getsticky, 1); + if (jl_typeis(task, jl_task_type)) + return task; + return multiq_deletemin(); +} + + +JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) +{ + jl_ptls_t ptls = jl_get_ptls_states(); + jl_task_t *task; + + while (1) { + jl_gc_safepoint(); + task = get_next_task(getsticky); + if (task) + return task; + + if (ptls->tid == 0) { + if (!_threadedregion) { + if (jl_run_once(jl_global_event_loop()) == 0) { + task = get_next_task(getsticky); + if (task) + return task; +#ifdef _OS_WINDOWS_ + Sleep(INFINITE); +#else + pause(); +#endif + } + } + else { + jl_process_events(jl_global_event_loop()); + } + } + else { + int sleepnow = 0; + if (!_threadedregion) { + uv_mutex_lock(&sleep_lock); + if (!_threadedregion) { + sleepnow = 1; + } + else { + uv_mutex_unlock(&sleep_lock); + } + } + else { + jl_cpu_pause(); + } + if (sleepnow) { + int8_t gc_state = jl_gc_safe_enter(ptls); + uv_cond_wait(&sleep_alarm, &sleep_lock); + uv_mutex_unlock(&sleep_lock); + jl_gc_safe_leave(ptls, gc_state); + } + } + } +} + + +void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp) +{ + for (int16_t i = 0; i < heap_p; ++i) + for (int16_t j = 0; j < heaps[i].ntasks; ++j) + jl_gc_mark_queue_obj_explicit(gc_cache, sp, (jl_value_t *)heaps[i].tasks[j]); +} + +#endif // JULIA_ENABLE_THREADING + +#ifdef __cplusplus +} +#endif diff --git a/src/task.c b/src/task.c index 59ca7748215c5..9196f26998f22 100644 --- a/src/task.c +++ b/src/task.c @@ -488,13 +488,13 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion t->logstate = ptls->current_task->logstate; // there is no active exception handler available on this stack yet t->eh = NULL; - // TODO: allow non-sticky tasks - t->tid = ptls->tid; t->sticky = 1; t->gcstack = NULL; t->excstack = NULL; t->stkbuf = NULL; t->started = 0; + t->prio = -1; + t->tid = 0; #ifdef ENABLE_TIMINGS t->timing_stack = NULL; #endif diff --git a/src/threadgroup.c b/src/threadgroup.c deleted file mode 100644 index f2158423acc0e..0000000000000 --- a/src/threadgroup.c +++ /dev/null @@ -1,206 +0,0 @@ -// This file is a part of Julia. License is MIT: https://julialang.org/license - -/* - threading infrastructure - . threadgroup abstraction - . fork/join/barrier -*/ - -#include -#include - -#include "julia.h" -#include "julia_internal.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#include "options.h" -#include "threadgroup.h" - -int ti_threadgroup_create(uint8_t num_sockets, uint8_t num_cores, - uint8_t num_threads_per_core, - ti_threadgroup_t **newtg) -{ - int i; - ti_threadgroup_t *tg; - int num_threads = num_sockets * num_cores * num_threads_per_core; - char *cp; - - tg = (ti_threadgroup_t*)jl_malloc_aligned(sizeof(ti_threadgroup_t), 64); - tg->tid_map = (int16_t*)jl_malloc_aligned(num_threads * sizeof(int16_t), 64); - for (i = 0; i < num_threads; ++i) - tg->tid_map[i] = -1; - tg->num_sockets = num_sockets; - tg->num_cores = num_cores; - tg->num_threads_per_core = num_threads_per_core; - tg->num_threads = num_threads; - tg->added_threads = 0; - tg->thread_sense = (ti_thread_sense_t**) - jl_malloc_aligned(num_threads * sizeof(ti_thread_sense_t*), 64); - for (i = 0; i < num_threads; i++) - tg->thread_sense[i] = NULL; - jl_atomic_store_release(&tg->group_sense, 0); - - uv_mutex_init(&tg->alarm_lock); - uv_cond_init(&tg->alarm); - - tg->sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD; - cp = getenv(THREAD_SLEEP_THRESHOLD_NAME); - if (cp) { - if (!strncasecmp(cp, "infinite", 8)) - tg->sleep_threshold = 0; - else - tg->sleep_threshold = (uint64_t)strtol(cp, NULL, 10); - } - - *newtg = tg; - return 0; -} - -int ti_threadgroup_addthread(ti_threadgroup_t *tg, int16_t ext_tid, - int16_t *tgtid) -{ - if (ext_tid < 0 || ext_tid >= tg->num_threads) - return -1; - if (tg->tid_map[ext_tid] != -1) - return -2; - if (tg->added_threads == tg->num_threads) - return -3; - - tg->tid_map[ext_tid] = tg->added_threads++; - if (tgtid) *tgtid = tg->tid_map[ext_tid]; - - return 0; -} - -int ti_threadgroup_initthread(ti_threadgroup_t *tg, int16_t ext_tid) -{ - ti_thread_sense_t *ts; - - if (ext_tid < 0 || ext_tid >= tg->num_threads) - return -1; - if (tg->thread_sense[tg->tid_map[ext_tid]] != NULL) - return -2; - if (tg->num_threads == 0) - return -3; - - ts = (ti_thread_sense_t*)jl_malloc_aligned(sizeof(ti_thread_sense_t), 64); - ts->sense = 1; - tg->thread_sense[tg->tid_map[ext_tid]] = ts; - - return 0; -} - -int ti_threadgroup_member(ti_threadgroup_t *tg, int16_t ext_tid, int16_t *tgtid) -{ - if (ext_tid < 0 || ext_tid >= tg->num_threads) - return -1; - if (tg == NULL) { - if (tgtid) *tgtid = -1; - return -2; - } - if (tg->tid_map[ext_tid] == -1) { - if (tgtid) *tgtid = -1; - return -3; - } - if (tgtid) *tgtid = tg->tid_map[ext_tid]; - - return 0; -} - -int ti_threadgroup_size(ti_threadgroup_t *tg, int16_t *tgsize) -{ - *tgsize = tg->num_threads; - return 0; -} - -int ti_threadgroup_fork(ti_threadgroup_t *tg, int16_t ext_tid, void **bcast_val, int init) -{ - uint8_t *group_sense = &tg->group_sense; - int16_t tid = tg->tid_map[ext_tid]; - int thread_sense = tg->thread_sense[tid]->sense; - if (tid == 0) { - tg->envelope = bcast_val ? *bcast_val : NULL; - // synchronize `tg->envelope` and `tg->group_sense` - jl_atomic_store_release(group_sense, thread_sense); - - // if it's possible that threads are sleeping, signal them - if (tg->sleep_threshold) { - uv_mutex_lock(&tg->alarm_lock); - uv_cond_broadcast(&tg->alarm); - uv_mutex_unlock(&tg->alarm_lock); - } - } - else { - // spin up to threshold ns (count sheep), then sleep - uint64_t spin_ns; - uint64_t spin_start = 0; - // synchronize `tg->envelope` and `tg->group_sense` - while (jl_atomic_load_acquire(group_sense) != thread_sense) { - if (tg->sleep_threshold) { - if (!spin_start) { - // Lazily initialize spin_start since uv_hrtime is expensive - spin_start = uv_hrtime(); - continue; - } - spin_ns = uv_hrtime() - spin_start; - // In case uv_hrtime is not monotonic, we'll sleep earlier - if (init || spin_ns >= tg->sleep_threshold) { - uv_mutex_lock(&tg->alarm_lock); - if (jl_atomic_load_acquire(group_sense) != thread_sense) { - uv_cond_wait(&tg->alarm, &tg->alarm_lock); - } - uv_mutex_unlock(&tg->alarm_lock); - spin_start = 0; - init = 0; - continue; - } - } - jl_cpu_pause(); - } - if (bcast_val) - *bcast_val = tg->envelope; - } - - return 0; -} - -int ti_threadgroup_join(ti_threadgroup_t *tg, int16_t ext_tid) -{ - int *p_thread_sense = &tg->thread_sense[tg->tid_map[ext_tid]]->sense; - jl_atomic_store_release(p_thread_sense, !*p_thread_sense); - if (tg->tid_map[ext_tid] == 0) { - jl_ptls_t ptls = jl_get_ptls_states(); - int8_t group_sense = tg->group_sense; - for (int i = 1; i < tg->num_threads; ++i) { - while (jl_atomic_load_acquire(&tg->thread_sense[i]->sense) == group_sense) { - jl_gc_safepoint_(ptls); - jl_cpu_pause(); - } - } - } - - return 0; -} - -int ti_threadgroup_destroy(ti_threadgroup_t *tg) -{ - int i; - - uv_mutex_destroy(&tg->alarm_lock); - uv_cond_destroy(&tg->alarm); - - for (i = 0; i < tg->num_threads; i++) - jl_free_aligned(tg->thread_sense[i]); - jl_free_aligned(tg->thread_sense); - jl_free_aligned(tg->tid_map); - jl_free_aligned(tg); - - return 0; -} - -#ifdef __cplusplus -} -#endif diff --git a/src/threadgroup.h b/src/threadgroup.h deleted file mode 100644 index 82fc59785cd05..0000000000000 --- a/src/threadgroup.h +++ /dev/null @@ -1,44 +0,0 @@ -// This file is a part of Julia. License is MIT: https://julialang.org/license - -#ifndef JL_THREADGROUP_H -#define JL_THREADGROUP_H - -#include -#include "uv.h" - -// for the barrier -typedef struct { - int sense; -} ti_thread_sense_t; - -// thread group -typedef struct { - int16_t *tid_map, num_threads, added_threads; - uint8_t num_sockets, num_cores, num_threads_per_core; - - // fork/join/barrier - uint8_t group_sense; // Written only by master thread - ti_thread_sense_t **thread_sense; - void *envelope; - - // to let threads sleep - uv_mutex_t alarm_lock; - uv_cond_t alarm; - uint64_t sleep_threshold; -} ti_threadgroup_t; - -int ti_threadgroup_create(uint8_t num_sockets, uint8_t num_cores, - uint8_t num_threads_per_core, - ti_threadgroup_t **newtg); -int ti_threadgroup_addthread(ti_threadgroup_t *tg, int16_t ext_tid, - int16_t *tgtid); -int ti_threadgroup_initthread(ti_threadgroup_t *tg, int16_t ext_tid); -int ti_threadgroup_member(ti_threadgroup_t *tg, int16_t ext_tid, - int16_t *tgtid); -int ti_threadgroup_size(ti_threadgroup_t *tg, int16_t *tgsize); -int ti_threadgroup_fork(ti_threadgroup_t *tg, int16_t ext_tid, - void **bcast_val, int init); -int ti_threadgroup_join(ti_threadgroup_t *tg, int16_t ext_tid); -int ti_threadgroup_destroy(ti_threadgroup_t *tg); - -#endif /* THREADGROUP_H */ diff --git a/src/threading.c b/src/threading.c index 92c0eac214bc4..e6475ac376af7 100644 --- a/src/threading.c +++ b/src/threading.c @@ -1,18 +1,5 @@ // This file is a part of Julia. License is MIT: https://julialang.org/license -/* - threading infrastructure - . thread and threadgroup creation - . thread function - . invoke Julia function from multiple threads - -TODO: - . fix interface to properly support thread groups - . add queue per thread for tasks - . add reduction; reduce values returned from thread function - . make code generation thread-safe and remove the lock -*/ - #include #include #include @@ -47,7 +34,6 @@ extern "C" { #endif -#include "threadgroup.h" #include "threading.h" // The tls_states buffer: @@ -240,8 +226,7 @@ JL_DLLEXPORT JL_CONST_FUNC jl_ptls_t (jl_get_ptls_states)(void) } #endif -// thread ID -JL_DLLEXPORT int jl_n_threads; // # threads we're actually using +JL_DLLEXPORT int jl_n_threads; jl_ptls_t *jl_all_tls_states; // return calling thread's ID @@ -253,10 +238,20 @@ JL_DLLEXPORT int16_t jl_threadid(void) return ptls->tid; } -static void ti_initthread(int16_t tid) +void jl_init_threadtls(int16_t tid) { jl_ptls_t ptls = jl_get_ptls_states(); -#ifndef _OS_WINDOWS_ + seed_cong(&ptls->rngseed); +#ifdef _OS_WINDOWS_ + if (tid == 0) { + if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), + GetCurrentProcess(), &hMainThread, 0, + FALSE, DUPLICATE_SAME_ACCESS)) { + jl_printf(JL_STDERR, "WARNING: failed to access handle to main thread\n"); + hMainThread = INVALID_HANDLE_VALUE; + } + } +#else ptls->system_id = pthread_self(); #endif assert(ptls->world_age == 0); @@ -293,159 +288,12 @@ static void ti_initthread(int16_t tid) jl_all_tls_states[tid] = ptls; } -static void ti_init_master_thread(void) -{ -#ifdef _OS_WINDOWS_ - if (!DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), - GetCurrentProcess(), &hMainThread, 0, - FALSE, DUPLICATE_SAME_ACCESS)) { - jl_printf(JL_STDERR, "WARNING: failed to access handle to main thread\n"); - hMainThread = INVALID_HANDLE_VALUE; - } -#endif - ti_initthread(0); -} - -// all threads call this function to run user code -static jl_value_t *ti_run_fun(jl_callptr_t fptr, jl_method_instance_t *mfunc, - jl_value_t **args, uint32_t nargs) -{ - jl_ptls_t ptls = jl_get_ptls_states(); - JL_TRY { - fptr(mfunc, args, nargs); - } - JL_CATCH { - // Lock this output since we know it'll likely happen on multiple threads - static jl_mutex_t lock; - JL_LOCK_NOGC(&lock); - jl_jmp_buf *old_buf = ptls->safe_restore; - jl_jmp_buf buf; - if (!jl_setjmp(buf, 0)) { - // Set up the safe_restore context so that the printing uses the thread safe version - ptls->safe_restore = &buf; - jl_printf(JL_STDERR, "\nError thrown in threaded loop on thread %d: ", - (int)ptls->tid); - jl_static_show(JL_STDERR, jl_current_exception()); - } - ptls->safe_restore = old_buf; - JL_UNLOCK_NOGC(&lock); - } - return jl_nothing; -} - - // lock for code generation jl_mutex_t codegen_lock; jl_mutex_t typecache_lock; #ifdef JULIA_ENABLE_THREADING -// only one thread group for now -static ti_threadgroup_t *tgworld; - -// for broadcasting work to threads -static ti_threadwork_t threadwork; - -#if PROFILE_JL_THREADING -uint64_t prep_ns; -uint64_t *fork_ns; -uint64_t *user_ns; -uint64_t *join_ns; -#endif - -static uv_barrier_t thread_init_done; - -// thread function: used by all except the main thread -void ti_threadfun(void *arg) -{ - jl_ptls_t ptls = jl_get_ptls_states(); - ti_threadarg_t *ta = (ti_threadarg_t *)arg; - ti_threadgroup_t *tg; - ti_threadwork_t *work; - - // initialize this thread (set tid, create heap, etc.) - ti_initthread(ta->tid); - void *stack_lo, *stack_hi; - jl_init_stack_limits(0, &stack_lo, &stack_hi); - - // set up tasking - jl_init_root_task(stack_lo, stack_hi); - - // set the thread-local tid and wait for a thread group - while (jl_atomic_load_acquire(&ta->state) == TI_THREAD_INIT) - jl_cpu_pause(); - - // Assuming the functions called below doesn't contain unprotected GC - // critical region. In general, the following part of this function - // shouldn't call any managed code without calling `jl_gc_unsafe_enter` - // first. - jl_gc_state_set(ptls, JL_GC_STATE_SAFE, 0); - uv_barrier_wait(&thread_init_done); - // initialize this thread in the thread group - tg = ta->tg; - ti_threadgroup_initthread(tg, ptls->tid); - - // free the thread argument here - free(ta); - - int init = 1; - - // work loop - for (; ;) { -#if PROFILE_JL_THREADING - uint64_t tstart = uv_hrtime(); -#endif - - ti_threadgroup_fork(tg, ptls->tid, (void **)&work, init); - init = 0; - JL_GC_PROMISE_ROOTED(work); - -#if PROFILE_JL_THREADING - uint64_t tfork = uv_hrtime(); - fork_ns[ptls->tid] += tfork - tstart; -#endif - - if (work) { - if (work->command == TI_THREADWORK_DONE) { - break; - } - else if (work->command == TI_THREADWORK_RUN) { - // TODO: return value? reduction? - // TODO: before we support getting return value from - // the work, and after we have proper GC transition - // support in the codegen and runtime we don't need to - // enter GC unsafe region when starting the work. - int8_t gc_state = jl_gc_unsafe_enter(ptls); - // This is probably always NULL for now - size_t last_age = ptls->world_age; - ptls->world_age = work->world_age; - ti_run_fun(work->fptr, work->mfunc, work->args, work->nargs); - ptls->world_age = last_age; - jl_gc_unsafe_leave(ptls, gc_state); - } - } - -#if PROFILE_JL_THREADING - uint64_t tuser = uv_hrtime(); - user_ns[ptls->tid] += tuser - tfork; -#endif - - ti_threadgroup_join(tg, ptls->tid); - -#if PROFILE_JL_THREADING - uint64_t tjoin = uv_hrtime(); - join_ns[ptls->tid] += tjoin - tuser; -#endif - - // TODO: - // nowait should skip the join, but confirm that fork is reentrant - } -} - -#if PROFILE_JL_THREADING -void ti_reset_timings(void); -#endif - ssize_t jl_tls_offset = -1; #ifdef JL_ELF_TLS_VARIANT @@ -556,36 +404,30 @@ void jl_init_threading(void) int max_threads = jl_cpu_threads(); jl_n_threads = JULIA_NUM_THREADS; cp = getenv(NUM_THREADS_NAME); - if (cp) { + if (cp) jl_n_threads = (uint64_t)strtol(cp, NULL, 10); - } if (jl_n_threads > max_threads) jl_n_threads = max_threads; if (jl_n_threads <= 0) jl_n_threads = 1; - jl_all_tls_states = (jl_ptls_t*)malloc(jl_n_threads * sizeof(void*)); + jl_all_tls_states = (jl_ptls_t*)calloc(jl_n_threads, sizeof(void*)); -#if PROFILE_JL_THREADING - // set up space for profiling information - fork_ns = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64); - user_ns = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64); - join_ns = (uint64_t*)jl_malloc_aligned(jl_n_threads * sizeof(uint64_t), 64); - ti_reset_timings(); -#endif + // initialize this thread (set tid, create heap, etc.) + jl_init_threadtls(0); - // initialize this master thread (set tid, create heap, etc.) - ti_init_master_thread(); + // initialize threading infrastructure + jl_init_threadinginfra(); } +static uv_barrier_t thread_init_done; + void jl_start_threads(void) { - jl_ptls_t ptls = jl_get_ptls_states(); int cpumasksize = uv_cpumask_size(); char *cp; int i, exclusive; uv_thread_t uvtid; - ti_threadarg_t **targs; if (cpumasksize < jl_n_threads) // also handles error case cpumasksize = jl_n_threads; char *mask = (char*)alloca(cpumasksize); @@ -611,219 +453,91 @@ void jl_start_threads(void) size_t nthreads = jl_n_threads; // create threads - targs = (ti_threadarg_t **)malloc((nthreads - 1) * sizeof (ti_threadarg_t *)); - uv_barrier_init(&thread_init_done, nthreads); - for (i = 0; i < nthreads - 1; ++i) { - targs[i] = (ti_threadarg_t *)malloc(sizeof (ti_threadarg_t)); - targs[i]->state = TI_THREAD_INIT; - targs[i]->tid = i + 1; - uv_thread_create(&uvtid, ti_threadfun, targs[i]); + for (i = 1; i < nthreads; ++i) { + jl_threadarg_t *t = (jl_threadarg_t*)malloc(sizeof(jl_threadarg_t)); // ownership will be passed to the thread + t->tid = i; + t->barrier = &thread_init_done; + uv_thread_create(&uvtid, jl_threadfun, t); if (exclusive) { - mask[i + 1] = 1; + mask[i] = 1; uv_thread_setaffinity(&uvtid, mask, NULL, cpumasksize); - mask[i + 1] = 0; + mask[i] = 0; } uv_thread_detach(&uvtid); } - // set up the world thread group - ti_threadgroup_create(1, nthreads, 1, &tgworld); - for (i = 0; i < nthreads; ++i) - ti_threadgroup_addthread(tgworld, i, NULL); - ti_threadgroup_initthread(tgworld, ptls->tid); - - // give the threads the world thread group; they will block waiting for fork - for (i = 0; i < nthreads - 1; ++i) { - targs[i]->tg = tgworld; - jl_atomic_store_release(&targs[i]->state, TI_THREAD_WORK); - } - uv_barrier_wait(&thread_init_done); - - // free the argument array; the threads will free their arguments - free(targs); } -// TODO: is this needed? where/when/how to call it? -void jl_shutdown_threading(void) -{ - jl_ptls_t ptls = jl_get_ptls_states(); - // stop the spinning threads by sending them a command - ti_threadwork_t *work = &threadwork; - - work->command = TI_THREADWORK_DONE; - ti_threadgroup_fork(tgworld, ptls->tid, (void **)&work, 0); - - sleep(1); - - // destroy the world thread group - ti_threadgroup_destroy(tgworld); - -#if PROFILE_JL_THREADING - jl_free_aligned(join_ns); - jl_free_aligned(user_ns); - jl_free_aligned(fork_ns); - fork_ns = user_ns = join_ns = NULL; #endif -} -// interface to user code: specialize and compile the user thread function -// and run it in all threads -JL_DLLEXPORT jl_value_t *jl_threading_run(jl_value_t *_args) +unsigned volatile _threadedregion; // HACK: prevent the root task from sleeping + +// simple fork/join mode code +JL_DLLEXPORT void jl_threading_run(jl_value_t *func) { jl_ptls_t ptls = jl_get_ptls_states(); - // GC safe -#if PROFILE_JL_THREADING - uint64_t tstart = uv_hrtime(); -#endif - uint32_t nargs; - jl_value_t **args; - if (!jl_is_svec(_args)) { - nargs = 1; - args = &_args; - } - else { - nargs = jl_svec_len(_args); - args = jl_svec_data(_args); - } - int8_t gc_state = jl_gc_unsafe_enter(ptls); - - size_t world = jl_get_ptls_states()->world_age; - threadwork.command = TI_THREADWORK_RUN; - threadwork.mfunc = jl_lookup_generic(args, nargs, - jl_int32hash_fast(jl_return_address()), world); + size_t world = jl_world_counter; + jl_method_instance_t *mfunc = jl_lookup_generic(&func, 1, jl_int32hash_fast(jl_return_address()), world); // Ignore constant return value for now. - threadwork.fptr = jl_compile_method_internal(&threadwork.mfunc, world); - if (threadwork.fptr == jl_fptr_const_return) - return jl_nothing; - threadwork.args = args; - threadwork.nargs = nargs; - threadwork.ret = jl_nothing; - threadwork.world_age = world; - -#if PROFILE_JL_THREADING - uint64_t tcompile = uv_hrtime(); - prep_ns += (tcompile - tstart); -#endif - - // fork the world thread group - ti_threadwork_t *tw = &threadwork; - ti_threadgroup_fork(tgworld, ptls->tid, (void **)&tw, 0); - -#if PROFILE_JL_THREADING - uint64_t tfork = uv_hrtime(); - fork_ns[ptls->tid] += (tfork - tcompile); -#endif - - // this thread must do work too (TODO: reduction?) - JL_GC_PROMISE_ROOTED(threadwork.mfunc); - tw->ret = ti_run_fun(threadwork.fptr, threadwork.mfunc, args, nargs); - -#if PROFILE_JL_THREADING - uint64_t trun = uv_hrtime(); - user_ns[ptls->tid] += (trun - tfork); -#endif - - // wait for completion (TODO: nowait?) - ti_threadgroup_join(tgworld, ptls->tid); - -#if PROFILE_JL_THREADING - uint64_t tjoin = uv_hrtime(); - join_ns[ptls->tid] += (tjoin - trun); -#endif - - jl_gc_unsafe_leave(ptls, gc_state); - - return tw->ret; -} - -#if PROFILE_JL_THREADING - -void ti_reset_timings(void) -{ - int i; - prep_ns = 0; - for (i = 0; i < jl_n_threads; i++) - fork_ns[i] = user_ns[i] = join_ns[i] = 0; -} + jl_callptr_t fptr = jl_compile_method_internal(&mfunc, world); + if (fptr == jl_fptr_const_return) + return; -void ti_timings(uint64_t *times, uint64_t *min, uint64_t *max, uint64_t *avg) -{ - int i; - *min = UINT64_MAX; - *max = *avg = 0; - for (i = 0; i < jl_n_threads; i++) { - if (times[i] < *min) - *min = times[i]; - if (times[i] > *max) - *max = times[i]; - *avg += times[i]; + size_t nthreads = jl_n_threads; + jl_svec_t *ts = jl_alloc_svec(nthreads); + JL_GC_PUSH1(&ts); + jl_value_t *wait_func = jl_get_global(jl_base_module, jl_symbol("wait")); + jl_value_t *schd_func = jl_get_global(jl_base_module, jl_symbol("schedule")); + // create and schedule all tasks + _threadedregion += 1; + for (int i = 0; i < nthreads; i++) { + jl_value_t *args2[2]; + args2[0] = (jl_value_t*)jl_task_type; + args2[1] = func; + jl_task_t *t = (jl_task_t*)jl_apply(args2, 2); + jl_svecset(ts, i, t); + t->sticky = 1; + t->tid = i; + args2[0] = schd_func; + args2[1] = (jl_value_t*)t; + jl_apply(args2, 2); + if (i == 1) { + // let threads know work is coming (optimistic) + uv_mutex_lock(&sleep_lock); + uv_cond_broadcast(&sleep_alarm); + uv_mutex_unlock(&sleep_lock); + } } - *avg /= jl_n_threads; -} - -#define NS_TO_SECS(t) ((t) / (double)1e9) - -JL_DLLEXPORT void jl_threading_profile(void) -{ - if (!fork_ns) return; - - printf("\nti profile:\n"); - printf("prep: %g (%" PRIu64 ")\n", NS_TO_SECS(prep_ns), prep_ns); - - uint64_t min, max, avg; - ti_timings(fork_ns, &min, &max, &avg); - printf("fork: %g (%g - %g)\n", NS_TO_SECS(min), NS_TO_SECS(max), - NS_TO_SECS(avg)); - ti_timings(user_ns, &min, &max, &avg); - printf("user: %g (%g - %g)\n", NS_TO_SECS(min), NS_TO_SECS(max), - NS_TO_SECS(avg)); - ti_timings(join_ns, &min, &max, &avg); - printf("join: %g (%g - %g)\n", NS_TO_SECS(min), NS_TO_SECS(max), - NS_TO_SECS(avg)); -} - -#else //!PROFILE_JL_THREADING - -JL_DLLEXPORT void jl_threading_profile(void) -{ -} - -#endif //!PROFILE_JL_THREADING - -#else // !JULIA_ENABLE_THREADING - -JL_DLLEXPORT jl_value_t *jl_threading_run(jl_value_t *_args) -{ - uint32_t nargs; - jl_value_t **args; - if (!jl_is_svec(_args)) { - nargs = 1; - args = &_args; + if (nthreads > 2) { + // let threads know work is ready (guaranteed) + uv_mutex_lock(&sleep_lock); + uv_cond_broadcast(&sleep_alarm); + uv_mutex_unlock(&sleep_lock); } - else { - nargs = jl_svec_len(_args); - args = jl_svec_data(_args); + // join with all tasks + for (int i = 0; i < nthreads; i++) { + jl_value_t *t = jl_svecref(ts, i); + jl_value_t *args[2] = { wait_func, t }; + jl_apply(args, 2); } - jl_method_instance_t *mfunc = jl_lookup_generic(args, nargs, - jl_int32hash_fast(jl_return_address()), - jl_get_ptls_states()->world_age); - size_t world = jl_get_ptls_states()->world_age; - jl_callptr_t fptr = jl_compile_method_internal(&mfunc, world); - if (fptr == jl_fptr_const_return) - return jl_nothing; - return ti_run_fun(fptr, mfunc, args, nargs); + _threadedregion -= 1; + JL_GC_POP(); + jl_gc_unsafe_leave(ptls, gc_state); } + +#ifndef JULIA_ENABLE_THREADING + void jl_init_threading(void) { static jl_ptls_t _jl_all_tls_states; jl_all_tls_states = &_jl_all_tls_states; jl_n_threads = 1; - ti_init_master_thread(); + jl_init_threadtls(0); } void jl_start_threads(void) { } diff --git a/src/threading.h b/src/threading.h index 8c812ca3c2676..8dd158cb5c5aa 100644 --- a/src/threading.h +++ b/src/threading.h @@ -8,50 +8,29 @@ extern "C" { #endif -#include "threadgroup.h" #include "julia.h" #define PROFILE_JL_THREADING 0 -// thread ID -extern jl_ptls_t *jl_all_tls_states; -extern JL_DLLEXPORT int jl_n_threads; // # threads we're actually using - -// thread state -enum { - TI_THREAD_INIT, - TI_THREAD_WORK -}; - -// passed to thread function -typedef struct { - int16_t volatile state; - int16_t tid; - ti_threadgroup_t *tg; -} ti_threadarg_t; - -// commands to thread function -enum { - TI_THREADWORK_DONE, - TI_THREADWORK_RUN -}; - -// work command to thread function -typedef struct { - uint8_t command; - jl_method_instance_t *mfunc; - jl_callptr_t fptr; - jl_value_t **args; - uint32_t nargs; - jl_value_t *ret; - size_t world_age; -} ti_threadwork_t; - -// thread function -void ti_threadfun(void *arg); - -// helpers for thread function -jl_value_t *ti_runthread(jl_function_t *f, jl_svec_t *args, size_t nargs); +extern jl_ptls_t *jl_all_tls_states; /* thread local storage */ +extern JL_DLLEXPORT int jl_n_threads; /* # threads we're actually using */ +extern volatile unsigned _threadedregion; // HACK: prevent tasks from sleeping in threaded regions + +extern uv_mutex_t sleep_lock; +extern uv_cond_t sleep_alarm; + +typedef struct _jl_threadarg_t { + int16_t tid; + uv_barrier_t *barrier; + void *arg; +} jl_threadarg_t; + +// each thread must initialize its TLS +void jl_init_threadtls(int16_t tid); + +// provided by a threading infrastructure +void jl_init_threadinginfra(void); +void jl_threadfun(void *arg); #ifdef __cplusplus }