Skip to content

Commit

Permalink
make Task state field more efficient (JuliaLang#36811)
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson committed Aug 11, 2020
1 parent 894ee04 commit 9be01af
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 40 deletions.
2 changes: 1 addition & 1 deletion base/show.jl
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ end

function show(io::IO, ::MIME"text/plain", t::Task)
show(io, t)
if t.state === :failed
if istaskfailed(t)
println(io)
show_task_exception(io, t)
end
Expand Down
40 changes: 32 additions & 8 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,30 @@ Get the currently running [`Task`](@ref).
"""
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())

# task states

const task_state_runnable = UInt8(0)
const task_state_done = UInt8(1)
const task_state_failed = UInt8(2)

@inline function getproperty(t::Task, field::Symbol)
if field === :state
# TODO: this field name should be deprecated in 2.0
st = getfield(t, :_state)
if st === task_state_runnable
return :runnable
elseif st === task_state_done
return :done
elseif st === task_state_failed
return :failed
else
@assert false
end
else
return getfield(t, field)
end
end

"""
istaskdone(t::Task) -> Bool
Expand All @@ -143,7 +167,7 @@ julia> istaskdone(b)
true
```
"""
istaskdone(t::Task) = ((t.state === :done) | istaskfailed(t))
istaskdone(t::Task) = t._state !== task_state_runnable

"""
istaskstarted(t::Task) -> Bool
Expand Down Expand Up @@ -184,7 +208,7 @@ julia> istaskfailed(b)
true
```
"""
istaskfailed(t::Task) = (t.state === :failed)
istaskfailed(t::Task) = (t._state === task_state_failed)

Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)

Expand Down Expand Up @@ -433,7 +457,7 @@ function task_done_hook(t::Task)

if err && !handled && Threads.threadid() == 1
if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) &&
active_repl_backend.backend_task.state === :runnable && isempty(Workqueue) &&
active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result) # this terminates the task
end
Expand All @@ -448,7 +472,7 @@ function task_done_hook(t::Task)
# issue #19467
if Threads.threadid() == 1 &&
isa(e, InterruptException) && isdefined(Base, :active_repl_backend) &&
active_repl_backend.backend_task.state === :runnable && isempty(Workqueue) &&
active_repl_backend.backend_task._state === task_state_runnable && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, e)
else
Expand Down Expand Up @@ -525,7 +549,7 @@ function __preinit_threads__()
end

function enq_work(t::Task)
(t.state === :runnable && t.queue === nothing) || error("schedule: Task not runnable")
(t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
tid = Threads.threadid(t)
# Note there are three reasons a Task might be put into a sticky queue
# even if t.sticky == false:
Expand Down Expand Up @@ -585,7 +609,7 @@ true
"""
function schedule(t::Task, @nospecialize(arg); error=false)
# schedule a task to be (re)started with the given value or exception
t.state === :runnable || Base.error("schedule: Task not runnable")
t._state === task_state_runnable || Base.error("schedule: Task not runnable")
if error
t.queue === nothing || Base.list_deletefirst!(t.queue, t)
setfield!(t, :exception, arg)
Expand Down Expand Up @@ -671,7 +695,7 @@ end
function ensure_rescheduled(othertask::Task)
ct = current_task()
W = Workqueues[Threads.threadid()]
if ct !== othertask && othertask.state === :runnable
if ct !== othertask && othertask._state === task_state_runnable
# we failed to yield to othertask
# return it to the head of a queue to be retried later
tid = Threads.threadid(othertask)
Expand All @@ -688,7 +712,7 @@ end
function trypoptask(W::StickyWorkqueue)
isempty(W) && return
t = popfirst!(W)
if t.state !== :runnable
if t._state !== task_state_runnable
# assume this somehow got queued twice,
# probably broken now, but try discarding this switch and keep going
# can't throw here, because it's probably not the fault of the caller to wait
Expand Down
6 changes: 3 additions & 3 deletions src/jltypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -2455,27 +2455,27 @@ void jl_init_types(void) JL_GC_DISABLED
"next",
"queue",
"storage",
"state",
"donenotify",
"result",
"exception",
"backtrace",
"logstate",
"code",
"_state",
"sticky"),
jl_svec(11,
jl_any_type,
jl_any_type,
jl_any_type,
jl_symbol_type,
jl_any_type,
jl_any_type,
jl_any_type,
jl_any_type,
jl_any_type,
jl_any_type,
jl_uint8_type,
jl_bool_type),
0, 1, 9);
0, 1, 8);
jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_nothing_type);
jl_svecset(jl_task_type->types, 0, listt);

Expand Down
10 changes: 7 additions & 3 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1791,20 +1791,22 @@ typedef struct _jl_task_t {
jl_value_t *next; // invasive linked list for scheduler
jl_value_t *queue; // invasive linked list for scheduler
jl_value_t *tls;
jl_sym_t *state;
jl_value_t *donenotify;
jl_value_t *result;
jl_value_t *exception;
jl_value_t *backtrace;
jl_value_t *logstate;
jl_function_t *start;
uint8_t _state;
uint8_t sticky; // record whether this Task can be migrated to a new thread

// hidden state:
// id of owning thread - does not need to be defined until the task runs
int16_t tid;
// multiqueue priority
int16_t prio;
// current world age
size_t world_age;

jl_ucontext_t ctx; // saved thread state
void *stkbuf; // malloc'd memory (either copybuf or stack)
Expand All @@ -1818,12 +1820,14 @@ typedef struct _jl_task_t {
jl_gcframe_t *gcstack;
// saved exception stack
jl_excstack_t *excstack;
// current world age
size_t world_age;

jl_timing_block_t *timing_stack;
} jl_task_t;

#define JL_TASK_STATE_RUNNABLE 0
#define JL_TASK_STATE_DONE 1
#define JL_TASK_STATE_FAILED 2

JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t*, jl_value_t*, size_t);
JL_DLLEXPORT void jl_switchto(jl_task_t **pt);
JL_DLLEXPORT void JL_NORETURN jl_throw(jl_value_t *e JL_MAYBE_UNROOTED);
Expand Down
1 change: 0 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,6 @@ extern jl_sym_t *nospecialize_sym; extern jl_sym_t *macrocall_sym;
extern jl_sym_t *colon_sym; extern jl_sym_t *hygienicscope_sym;
extern jl_sym_t *throw_undef_if_not_sym; extern jl_sym_t *getfield_undefref_sym;
extern jl_sym_t *gc_preserve_begin_sym; extern jl_sym_t *gc_preserve_end_sym;
extern jl_sym_t *failed_sym; extern jl_sym_t *done_sym; extern jl_sym_t *runnable_sym;
extern jl_sym_t *coverageeffect_sym; extern jl_sym_t *escape_sym;
extern jl_sym_t *optlevel_sym;
extern jl_sym_t *atom_sym; extern jl_sym_t *statement_sym; extern jl_sym_t *all_sym;
Expand Down
3 changes: 1 addition & 2 deletions src/stackwalk.c
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ JL_DLLEXPORT jl_value_t *jl_get_excstack(jl_task_t* task, int include_bt, int ma
{
JL_TYPECHK(catch_stack, task, (jl_value_t*)task);
jl_ptls_t ptls = jl_get_ptls_states();
if (task != ptls->current_task &&
task->state != failed_sym && task->state != done_sym) {
if (task != ptls->current_task && task->_state == JL_TASK_STATE_RUNNABLE) {
jl_error("Inspecting the exception stack of a task which might "
"be running concurrently isn't allowed.");
}
Expand Down
22 changes: 6 additions & 16 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ volatile int jl_in_stackwalk = 0;
#define STATIC_OR_JS static
#endif

jl_sym_t *done_sym;
jl_sym_t *failed_sym;
jl_sym_t *runnable_sym;

extern size_t jl_page_size;
static char *jl_alloc_fiber(jl_ucontext_t *t, size_t *ssize, jl_task_t *owner) JL_NOTSAFEPOINT;
STATIC_OR_JS void jl_set_fiber(jl_ucontext_t *t);
Expand Down Expand Up @@ -194,9 +190,9 @@ void JL_NORETURN jl_finish_task(jl_task_t *t, jl_value_t *resultval JL_MAYBE_UNR
t->result = resultval;
jl_gc_wb(t, t->result);
if (t->exception != jl_nothing)
jl_atomic_store_release(&t->state, failed_sym);
jl_atomic_store_release(&t->_state, JL_TASK_STATE_FAILED);
else
jl_atomic_store_release(&t->state, done_sym);
jl_atomic_store_release(&t->_state, JL_TASK_STATE_DONE);
if (t->copy_stack) // early free of stkbuf
t->stkbuf = NULL;
// ensure that state is cleared
Expand Down Expand Up @@ -302,8 +298,7 @@ static void ctx_switch(jl_ptls_t ptls)
}
#endif


int killed = (lastt->state == done_sym || lastt->state == failed_sym);
int killed = lastt->_state != JL_TASK_STATE_RUNNABLE;
if (!t->started && !t->copy_stack) {
// may need to allocate the stack
if (t->stkbuf == NULL) {
Expand Down Expand Up @@ -450,8 +445,7 @@ JL_DLLEXPORT void jl_switch(void)
if (t == ct) {
return;
}
if (t->state == done_sym || t->state == failed_sym ||
(t->started && t->stkbuf == NULL)) {
if (t->_state != JL_TASK_STATE_RUNNABLE || (t->started && t->stkbuf == NULL)) {
ct->exception = t->exception;
ct->result = t->result;
return;
Expand Down Expand Up @@ -635,7 +629,7 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion
t->next = jl_nothing;
t->queue = jl_nothing;
t->tls = jl_nothing;
t->state = runnable_sym;
t->_state = JL_TASK_STATE_RUNNABLE;
t->start = start;
t->result = jl_nothing;
t->donenotify = completion_future;
Expand Down Expand Up @@ -729,10 +723,6 @@ void JL_DLLEXPORT jl_schedule_task(jl_task_t *task)
// Do one-time initializations for task system
void jl_init_tasks(void) JL_GC_DISABLED
{
done_sym = jl_symbol("done");
failed_sym = jl_symbol("failed");
runnable_sym = jl_symbol("runnable");

char *acs = getenv("JULIA_COPY_STACKS");
if (acs) {
if (!strcmp(acs, "1") || !strcmp(acs, "yes"))
Expand Down Expand Up @@ -1210,7 +1200,7 @@ void jl_init_root_task(void *stack_lo, void *stack_hi)
ptls->current_task->started = 1;
ptls->current_task->next = jl_nothing;
ptls->current_task->queue = jl_nothing;
ptls->current_task->state = runnable_sym;
ptls->current_task->_state = JL_TASK_STATE_RUNNABLE;
ptls->current_task->start = NULL;
ptls->current_task->result = jl_nothing;
ptls->current_task->donenotify = jl_nothing;
Expand Down
8 changes: 4 additions & 4 deletions stdlib/Distributed/src/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ function serialize(s::ClusterSerializer, t::Task)
end
serialize(s, bt)
end
serialize(s, t.state)
serialize(s, t._state)
serialize(s, t.result)
serialize(s, t.exception)
end
Expand Down Expand Up @@ -257,11 +257,11 @@ function deserialize(s::ClusterSerializer, ::Type{Task})
t.code = deserialize(s)
t.storage = deserialize(s)
state_or_bt = deserialize(s)
if state_or_bt isa Symbol
t.state = state_or_bt
if state_or_bt isa UInt8
t._state = state_or_bt
else
t.backtrace = state_or_bt
t.state = deserialize(s)
t._state = deserialize(s)
end
t.result = deserialize(s)
t.exception = deserialize(s)
Expand Down
11 changes: 10 additions & 1 deletion stdlib/Serialization/src/Serialization.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,16 @@ function deserialize(s::AbstractSerializer, ::Type{Task})
deserialize_cycle(s, t)
t.code = deserialize(s)
t.storage = deserialize(s)
t.state = deserialize(s)
state = deserialize(s)
if state === :runnable
t._state = Base.task_state_runnable
elseif state === :done
t._state = Base.task_state_done
elseif state === :failed
t._state = Base.task_state_failed
else
@assert false
end
t.result = deserialize(s)
t.exception = deserialize(s)
t
Expand Down
2 changes: 1 addition & 1 deletion test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ end
"""
# test for invalid state in Workqueue during yield
t = @async nothing
t.state = :invalid
t._state = 66
newstderr = redirect_stderr()
try
errstream = @async read(newstderr[1], String)
Expand Down

0 comments on commit 9be01af

Please sign in to comment.