Skip to content

Commit

Permalink
enable partr backend
Browse files Browse the repository at this point in the history
drop/fix more broken code
drop jl_thread_sleep_threshold deadcode
improve jl_wakeup_thread
  • Loading branch information
vtjnash authored and JeffBezanson committed Mar 21, 2019
1 parent 43b188c commit d9d8d4c
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 199 deletions.
59 changes: 33 additions & 26 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,17 @@ end

function enq_work(t::Task)
(t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable")
tid = (t.sticky ? Threads.threadid(t) : 0)
if tid == 0
tid = Threads.threadid()
if t.sticky
tid = Threads.threadid(t)
if tid == 0
tid = Threads.threadid()
end
push!(Workqueues[tid], t)
else
tid = 0
ccall(:jl_enqueue_task, Cvoid, (Any,), t)
end
push!(Workqueues[tid], t)
tid == 1 && ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16)
return t
end

Expand Down Expand Up @@ -603,30 +608,32 @@ function trypoptask(W::StickyWorkqueue)
end

@noinline function poptaskref(W::StickyWorkqueue)
local task
while true
task = trypoptask(W)
task === nothing || break
if !Threads.in_threaded_loop[] && Threads.threadid() == 1
if process_events(true) == 0
task = trypoptask(W)
task === nothing || break
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
if Threads.threadid() == 1
process_events(false)
end
ccall(:jl_gc_safepoint, Cvoid, ())
ccall(:jl_cpu_pause, Cvoid, ())
end
end
gettask = () -> trypoptask(W)
task = ccall(:jl_task_get_next, Any, (Any,), gettask)
## Below is a reference implementation for `jl_task_get_next`, which currently lives in C
#local task
#while true
# task = trypoptask(W)
# task === nothing || break
# if !Threads.in_threaded_loop[] && Threads.threadid() == 1
# if process_events(true) == 0
# task = trypoptask(W)
# task === nothing || break
# # if there are no active handles and no runnable tasks, just
# # wait for signals.
# pause()
# end
# else
# if Threads.threadid() == 1
# process_events(false)
# end
# ccall(:jl_gc_safepoint, Cvoid, ())
# ccall(:jl_cpu_pause, Cvoid, ())
# end
#end
return Ref(task)
end


function wait()
W = Workqueues[Threads.threadid()]
reftask = poptaskref(W)
Expand Down
6 changes: 0 additions & 6 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1634,12 +1634,10 @@ STATIC_INLINE int gc_mark_queue_obj(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_
return (int)nptr;
}

#ifdef JULIA_ENABLE_PARTR
int jl_gc_mark_queue_obj_explicit(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp, jl_value_t *obj)
{
return gc_mark_queue_obj(gc_cache, sp, obj);
}
#endif

JL_DLLEXPORT int jl_gc_mark_queue_obj(jl_ptls_t ptls, jl_value_t *obj)
{
Expand Down Expand Up @@ -2490,20 +2488,16 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp
gc_mark_queue_obj(gc_cache, sp, ptls2->previous_exception);
}

#ifdef JULIA_ENABLE_PARTR
void jl_gc_mark_enqueued_tasks(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp);
#endif

// mark the initial root set
static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
{
// modules
gc_mark_queue_obj(gc_cache, sp, jl_main_module);

#ifdef JULIA_ENABLE_PARTR
// tasks
jl_gc_mark_enqueued_tasks(gc_cache, sp);
#endif

// invisible builtin values
if (jl_an_empty_vec_any != NULL)
Expand Down
4 changes: 4 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,10 @@ void _julia_init(JL_IMAGE_SEARCH rel)
ptls->world_age = last_age;
}
}
else {
// nthreads > 1 requires code in Base
jl_n_threads = 1;
}
jl_start_threads();

// This needs to be after jl_start_threads
Expand Down
3 changes: 1 addition & 2 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1647,9 +1647,8 @@ typedef struct _jl_task_t {
jl_ucontext_t ctx; // saved thread state
void *stkbuf; // malloc'd memory (either copybuf or stack)
size_t bufsz; // actual sizeof stkbuf
unsigned int copy_stack; // sizeof stack for copybuf
unsigned int copy_stack:31; // sizeof stack for copybuf
unsigned int started:1;
unsigned int sticky:1;

// current exception handler
jl_handler_t *eh;
Expand Down
Loading

0 comments on commit d9d8d4c

Please sign in to comment.