Skip to content

Commit

Permalink
Merge pull request #31398 from JuliaLang/jn/kp/partr2
Browse files Browse the repository at this point in the history
import partr code, allow using it in threaded loops
  • Loading branch information
JeffBezanson committed Mar 22, 2019
2 parents a84970d + d9d8d4c commit 6a28aa9
Show file tree
Hide file tree
Showing 16 changed files with 527 additions and 686 deletions.
59 changes: 33 additions & 26 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,17 @@ end

function enq_work(t::Task)
(t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable")
tid = (t.sticky ? Threads.threadid(t) : 0)
if tid == 0
tid = Threads.threadid()
if t.sticky
tid = Threads.threadid(t)
if tid == 0
tid = Threads.threadid()
end
push!(Workqueues[tid], t)
else
tid = 0
ccall(:jl_enqueue_task, Cvoid, (Any,), t)
end
push!(Workqueues[tid], t)
tid == 1 && ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
return t
end

Expand Down Expand Up @@ -603,30 +608,32 @@ function trypoptask(W::StickyWorkqueue)
end

@noinline function poptaskref(W::StickyWorkqueue)
local task
while true
task = trypoptask(W)
task === nothing || break
if !Threads.in_threaded_loop[] && Threads.threadid() == 1
if process_events(true) == 0
task = trypoptask(W)
task === nothing || break
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
if Threads.threadid() == 1
process_events(false)
end
ccall(:jl_gc_safepoint, Cvoid, ())
ccall(:jl_cpu_pause, Cvoid, ())
end
end
gettask = () -> trypoptask(W)
task = ccall(:jl_task_get_next, Any, (Any,), gettask)
## Below is a reference implementation for `jl_task_get_next`, which currently lives in C
#local task
#while true
# task = trypoptask(W)
# task === nothing || break
# if !Threads.in_threaded_loop[] && Threads.threadid() == 1
# if process_events(true) == 0
# task = trypoptask(W)
# task === nothing || break
# # if there are no active handles and no runnable tasks, just
# # wait for signals.
# pause()
# end
# else
# if Threads.threadid() == 1
# process_events(false)
# end
# ccall(:jl_gc_safepoint, Cvoid, ())
# ccall(:jl_cpu_pause, Cvoid, ())
# end
#end
return Ref(task)
end


function wait()
W = Workqueues[Threads.threadid()]
reftask = poptaskref(W)
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ SRCS := \
jltypes gf typemap ast builtins module interpreter symbol \
dlload sys init task array dump staticdata toplevel jl_uv datatype \
simplevector APInt-C runtime_intrinsics runtime_ccall precompile \
threadgroup threading stackwalk gc gc-debug gc-pages gc-stacks method \
threading partr stackwalk gc gc-debug gc-pages gc-stacks method \
jlapi signal-handling safepoint jloptions timing subtype rtutils \
crc32c processor

Expand Down Expand Up @@ -215,7 +215,7 @@ $(BUILDDIR)/gc-debug.o $(BUILDDIR)/gc-debug.dbg.obj: $(SRCDIR)/gc.h
$(BUILDDIR)/gc-pages.o $(BUILDDIR)/gc-pages.dbg.obj: $(SRCDIR)/gc.h
$(BUILDDIR)/signal-handling.o $(BUILDDIR)/signal-handling.dbg.obj: $(addprefix $(SRCDIR)/,signals-*.c)
$(BUILDDIR)/dump.o $(BUILDDIR)/dump.dbg.obj: $(addprefix $(SRCDIR)/,common_symbols1.inc common_symbols2.inc)
$(addprefix $(BUILDDIR)/,threading.o threading.dbg.obj gc.o gc.dbg.obj init.c init.dbg.obj task.o task.dbg.obj): $(addprefix $(SRCDIR)/,threading.h threadgroup.h)
$(addprefix $(BUILDDIR)/,threading.o threading.dbg.obj gc.o gc.dbg.obj init.c init.dbg.obj task.o task.dbg.obj): $(addprefix $(SRCDIR)/,threading.h)
$(addprefix $(BUILDDIR)/,APInt-C.o APInt-C.dbg.obj runtime_intrinsics.o runtime_intrinsics.dbg.obj): $(SRCDIR)/APInt-C.h

# archive library file rules
Expand Down
6 changes: 6 additions & 0 deletions src/atomics.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@
// the __atomic builtins or c11 atomics with GNU extension or c11 _Generic
# define jl_atomic_compare_exchange(obj, expected, desired) \
__sync_val_compare_and_swap(obj, expected, desired)
# define jl_atomic_bool_compare_exchange(obj, expected, desired) \
__sync_bool_compare_and_swap(obj, expected, desired)
# define jl_atomic_exchange(obj, desired) \
__atomic_exchange_n(obj, desired, __ATOMIC_SEQ_CST)
# define jl_atomic_exchange_generic(obj, desired, orig)\
__atomic_exchange(obj, desired, orig, __ATOMIC_SEQ_CST)
# define jl_atomic_exchange_relaxed(obj, desired) \
__atomic_exchange_n(obj, desired, __ATOMIC_RELAXED)
// TODO: Maybe add jl_atomic_compare_exchange_weak for spin lock
Expand Down Expand Up @@ -115,6 +119,7 @@ jl_atomic_fetch_add(T *obj, T2 arg)
{
return (T)_InterlockedExchangeAdd64((volatile __int64*)obj, (__int64)arg);
}
// TODO: jl_atomic_exchange_generic
#define jl_atomic_fetch_add_relaxed(obj, arg) jl_atomic_fetch_add(obj, arg)

// and
Expand Down Expand Up @@ -200,6 +205,7 @@ jl_atomic_compare_exchange(volatile T *obj, T2 expected, T3 desired)
return (T)_InterlockedCompareExchange64((volatile __int64*)obj,
(__int64)desired, (__int64)expected);
}
// TODO: jl_atomic_bool_compare_exchange
// atomic exchange
template<typename T, typename T2>
static inline typename std::enable_if<sizeof(T) == 1, T>::type
Expand Down
10 changes: 10 additions & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,11 @@ STATIC_INLINE int gc_mark_queue_obj(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_
return (int)nptr;
}

int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp, jl_value_t *obj)
{
return gc_mark_queue_obj(gc_cache, sp, obj);
}

JL_DLLEXPORT int jl_gc_mark_queue_obj(jl_ptls_t ptls, jl_value_t *obj)
{
return gc_mark_queue_obj(&ptls->gc_cache, &ptls->gc_mark_sp, obj);
Expand Down Expand Up @@ -2483,12 +2488,17 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp
gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception);
}

void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp);

// mark the initial root set
static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
{
// modules
gc_mark_queue_obj(gc_cache, sp, jl_main_module);

// tasks
jl_gc_mark_enqueued_tasks(gc_cache, sp);

// invisible builtin values
if (jl_an_empty_vec_any != NULL)
gc_mark_queue_obj(gc_cache, sp, jl_an_empty_vec_any);
Expand Down
4 changes: 4 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,10 @@ void _julia_init(JL_IMAGE_SEARCH rel)
ptls->world_age = last_age;
}
}
else {
// nthreads > 1 requires code in Base
jl_n_threads = 1;
}
jl_start_threads();

// This needs to be after jl_start_threads
Expand Down
2 changes: 2 additions & 0 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,8 @@ typedef struct _jl_task_t {
// id of owning thread
// does not need to be defined until the task runs
int16_t tid;
/* for the multiqueue */
int16_t prio;
#ifdef JULIA_ENABLE_THREADING
// This is statically initialized when the task is not holding any locks
arraylist_t locks;
Expand Down
19 changes: 18 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ extern ssize_t jl_tls_offset;
extern const int jl_tls_elf_support;
void jl_init_threading(void);
void jl_start_threads(void);
void jl_shutdown_threading(void);

// Whether the GC is running
extern char *jl_safepoint_pages;
Expand Down Expand Up @@ -708,6 +707,24 @@ void jl_copy_excstack(jl_excstack_t *dest, jl_excstack_t *src) JL_NOTSAFEPOINT;
// Returns time in nanosec
JL_DLLEXPORT uint64_t jl_hrtime(void);

// congruential random number generator
// for a small amount of thread-local randomness
// we could just use libc:`rand()`, but we want to ensure this is fast
STATIC_INLINE void seed_cong(uint64_t *seed)
{
*seed = rand();
}
STATIC_INLINE void unbias_cong(uint64_t max, uint64_t *unbias)
{
*unbias = UINT64_MAX - ((UINT64_MAX % max) + 1);
}
STATIC_INLINE uint64_t cong(uint64_t max, uint64_t unbias, uint64_t *seed)
{
while ((*seed = 69069 * (*seed) + 362437) > unbias)
;
return *seed % max;
}

// libuv stuff:
JL_DLLEXPORT extern void *jl_dl_handle;
JL_DLLEXPORT extern void *jl_RTLD_DEFAULT_handle;
Expand Down
3 changes: 2 additions & 1 deletion src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ typedef struct _jl_excstack_t jl_excstack_t;
struct _jl_tls_states_t {
struct _jl_gcframe_t *pgcstack;
size_t world_age;
int16_t tid;
uint64_t rngseed;
volatile size_t *safepoint;
// Whether it is safe to execute GC at the same time.
#define JL_GC_STATE_WAITING 1
Expand All @@ -158,7 +160,6 @@ struct _jl_tls_states_t {
size_t stacksize;
jl_ucontext_t base_ctx; // base context of stack
jl_jmp_buf *safe_restore;
int16_t tid;
// Temp storage for exception thrown in signal handler. Not rooted.
struct _jl_value_t *sig_exception;
// Temporary backtrace buffer. Scanned for gc roots when bt_size > 0.
Expand Down
16 changes: 16 additions & 0 deletions src/locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ static inline void jl_mutex_lock(jl_mutex_t *lock)
jl_gc_enable_finalizers(ptls, 0);
}

static inline int jl_mutex_trylock_nogc(jl_mutex_t *lock)
{
unsigned long self = jl_thread_self();
unsigned long owner = jl_atomic_load_acquire(&lock->owner);
if (owner == self) {
lock->count++;
return 1;
}
if (owner == 0 &&
jl_atomic_compare_exchange(&lock->owner, 0, self) == 0) {
lock->count = 1;
return 1;
}
return 0;
}

/* Call this function for code that could be called from either a managed
or an unmanaged thread */
static inline void jl_mutex_lock_maybe_nogc(jl_mutex_t *lock)
Expand Down
19 changes: 19 additions & 0 deletions src/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,25 @@
#define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE"
#define DEFAULT_MACHINE_EXCLUSIVE 0

// partr -- parallel tasks runtime options ------------------------------------

// multiq
// number of heaps = MULTIQ_HEAP_C * nthreads
#define MULTIQ_HEAP_C 4
// how many in each heap
#define MULTIQ_TASKS_PER_HEAP 129

// parfor
// tasks = niters / (GRAIN_K * nthreads)
#define GRAIN_K 4

// synchronization
// narrivers = ((GRAIN_K * nthreads) ^ ARRIVERS_P) + 1
// limit for number of recursive parfors
#define ARRIVERS_P 2
// nreducers = narrivers * REDUCERS_FRAC
#define REDUCERS_FRAC 1


// sanitizer defaults ---------------------------------------------------------

Expand Down
Loading

0 comments on commit 6a28aa9

Please sign in to comment.