Skip to content

Commit

Permalink
complete thread sleep/wakeup logic and integrate with I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash authored and JeffBezanson committed May 24, 2019
1 parent 4752908 commit 93e3d28
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 264 deletions.
5 changes: 2 additions & 3 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ mutable struct Timer
associate_julia_struct(this.handle, this)
finalizer(uvfinalize, this)

ccall(:jl_uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:jl_uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
ccall(:uv_update_time, Cvoid, (Ptr{Cvoid},), eventloop())
ccall(:uv_timer_start, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64),
this, uv_jl_timercb::Ptr{Cvoid},
UInt64(round(timeout * 1000)) + 1, UInt64(round(interval * 1000)))
return this
Expand All @@ -124,7 +124,6 @@ isopen(t::Union{Timer, AsyncCondition}) = t.isopen
function close(t::Union{Timer, AsyncCondition})
if t.handle != C_NULL && isopen(t)
t.isopen = false
isa(t, Timer) && ccall(:jl_uv_timer_stop, Cint, (Ptr{Cvoid},), t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
nothing
Expand Down
30 changes: 7 additions & 23 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ end

# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
t === current_task() && error("deadlock detected: cannot wait on current task")
if !istaskdone(t)
lock(t.donenotify)
try
Expand All @@ -197,6 +198,7 @@ function wait(t::Task)
if istaskfailed(t)
throw(t.exception)
end
nothing
end

fetch(@nospecialize x) = x
Expand Down Expand Up @@ -560,29 +562,11 @@ function trypoptask(W::StickyWorkqueue)
end

@noinline function poptaskref(W::StickyWorkqueue)
gettask = () -> trypoptask(W)
task = ccall(:jl_task_get_next, Any, (Any,), gettask)
## Below is a reference implementation for `jl_task_get_next`, which currently lives in C
#local task
#while true
# task = trypoptask(W)
# task === nothing || break
# if !Threads.in_threaded_loop[] && Threads.threadid() == 1
# if process_events(true) == 0
# task = trypoptask(W)
# task === nothing || break
# # if there are no active handles and no runnable tasks, just
# # wait for signals.
# pause()
# end
# else
# if Threads.threadid() == 1
# process_events(false)
# end
# ccall(:jl_gc_safepoint, Cvoid, ())
# ccall(:jl_cpu_pause, Cvoid, ())
# end
#end
task = trypoptask(W)
if !(task isa Task)
gettask = () -> trypoptask(W)
task = ccall(:jl_task_get_next, Any, (Any,), gettask)::Task
end
return Ref(task)
end

Expand Down
10 changes: 5 additions & 5 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ static void jl_uv_exitcleanup_add(uv_handle_t *handle, struct uv_shutdown_queue
struct uv_shutdown_queue_item *item = (struct uv_shutdown_queue_item*)malloc(sizeof(struct uv_shutdown_queue_item));
item->h = handle;
item->next = NULL;
JL_UV_LOCK();
if (queue->last) queue->last->next = item;
if (!queue->first) queue->first = item;
if (queue->last)
queue->last->next = item;
if (!queue->first)
queue->first = item;
queue->last = item;
JL_UV_UNLOCK();
}

static void jl_uv_exitcleanup_walk(uv_handle_t *handle, void *arg)
Expand Down Expand Up @@ -297,8 +297,8 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode)

// force libuv to spin until everything has finished closing
loop->stop_flag = 0;
JL_UV_UNLOCK();
while (uv_run(loop, UV_RUN_DEFAULT)) { }
JL_UV_UNLOCK();

// TODO: Destroy threads

Expand Down
43 changes: 11 additions & 32 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,16 @@ JL_DLLEXPORT void jl_uv_req_set_data(uv_req_t *req, void *data) { req->data = da
JL_DLLEXPORT void *jl_uv_handle_data(uv_handle_t *handle) { return handle->data; }
JL_DLLEXPORT void *jl_uv_write_handle(uv_write_t *req) { return req->handle; }

extern volatile unsigned _threadedregion;

JL_DLLEXPORT int jl_run_once(uv_loop_t *loop)
{
jl_ptls_t ptls = jl_get_ptls_states();
if (loop) {
if (loop && (_threadedregion || ptls->tid == 0)) {
jl_gc_safepoint_(ptls);
JL_UV_LOCK();
loop->stop_flag = 0;
int r = uv_run(loop,UV_RUN_ONCE);
int r = uv_run(loop, UV_RUN_ONCE);
JL_UV_UNLOCK();
return r;
}
Expand All @@ -207,13 +209,14 @@ JL_DLLEXPORT int jl_run_once(uv_loop_t *loop)
JL_DLLEXPORT int jl_process_events(uv_loop_t *loop)
{
jl_ptls_t ptls = jl_get_ptls_states();
if (loop) {
if (loop && (_threadedregion || ptls->tid == 0)) {
jl_gc_safepoint_(ptls);
JL_UV_LOCK();
loop->stop_flag = 0;
int r = uv_run(loop,UV_RUN_NOWAIT);
JL_UV_UNLOCK();
return r;
if (jl_mutex_trylock(&jl_uv_mutex)) {
loop->stop_flag = 0;
int r = uv_run(loop, UV_RUN_NOWAIT);
JL_UV_UNLOCK();
return r;
}
}
return 0;
}
Expand Down Expand Up @@ -1090,30 +1093,6 @@ JL_DLLEXPORT void jl_uv_stop(uv_loop_t* loop)
JL_UV_UNLOCK();
}

JL_DLLEXPORT void jl_uv_update_time(uv_loop_t* loop)
{
JL_UV_LOCK();
uv_update_time(loop);
JL_UV_UNLOCK();
}

JL_DLLEXPORT int jl_uv_timer_start(uv_timer_t* handle, uv_timer_cb cb,
uint64_t timeout, uint64_t repeat)
{
JL_UV_LOCK();
int r = uv_timer_start(handle, cb, timeout, repeat);
JL_UV_UNLOCK();
return r;
}

JL_DLLEXPORT int jl_uv_timer_stop(uv_timer_t* handle)
{
JL_UV_LOCK();
int r = uv_timer_stop(handle);
JL_UV_UNLOCK();
return r;
}

JL_DLLEXPORT int jl_uv_fs_scandir(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags,
uv_fs_cb cb)
{
Expand Down
8 changes: 7 additions & 1 deletion src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,14 @@ struct _jl_tls_states_t {
volatile int8_t gc_state;
volatile int8_t in_finalizer;
int8_t disable_gc;
jl_thread_heap_t heap;
uv_mutex_t sleep_lock;
uv_cond_t wake_signal;
volatile sig_atomic_t defer_signal;
struct _jl_task_t *current_task;
#ifdef MIGRATE_TASKS
struct _jl_task_t *previous_task;
#endif
struct _jl_task_t *root_task;
void *stackbase;
size_t stacksize;
Expand All @@ -173,7 +178,6 @@ struct _jl_tls_states_t {
// this is limited to the few places we do synchronous IO
// we can make this more general (similar to defer_signal) if necessary
volatile sig_atomic_t io_wait;
jl_thread_heap_t heap;
#ifndef _OS_WINDOWS_
// These are only used on unix now
pthread_t system_id;
Expand Down Expand Up @@ -289,6 +293,8 @@ JL_DLLEXPORT void (jl_gc_safepoint)(void);

JL_DLLEXPORT void jl_gc_enable_finalizers(jl_ptls_t ptls, int on);

JL_DLLEXPORT void jl_wakeup_thread(int16_t tid);

#ifdef __cplusplus
}
#endif
Expand Down
5 changes: 4 additions & 1 deletion src/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,14 @@
#define JL_STACK_SIZE (2*1024*1024)
#endif

// allow a suspended Task to restart on a different thread
//#define MIGRATE_TASKS

// threading options ----------------------------------------------------------

// controls for when threads sleep
#define THREAD_SLEEP_THRESHOLD_NAME "JULIA_THREAD_SLEEP_THRESHOLD"
#define DEFAULT_THREAD_SLEEP_THRESHOLD 4e9 // cycles (4e9==4sec@1GHz)
#define DEFAULT_THREAD_SLEEP_THRESHOLD 4*1000*1000 // nanoseconds (4ms)

// defaults for # threads
#define NUM_THREADS_NAME "JULIA_NUM_THREADS"
Expand Down
Loading

0 comments on commit 93e3d28

Please sign in to comment.