Skip to content

Commit

Permalink
Make fetch/wait on Task consistend with RemoteRefs [Closes JuliaLang#…
Browse files Browse the repository at this point in the history
  • Loading branch information
iamed2 authored and JeffBezanson committed Feb 9, 2018
1 parent 16620b6 commit 94197a4
Show file tree
Hide file tree
Showing 23 changed files with 80 additions and 62 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,8 @@ Deprecated or removed
* The fallback method `^(x, p::Integer)` is deprecated. If your type relied on this definition,
add a method such as `^(x::MyType, p::Integer) = Base.power_by_squaring(x, p)` ([#23332]).

* `wait` and `fetch` on `Task` now resemble the interface of `Future`

Command-line option changes
---------------------------

Expand Down
4 changes: 2 additions & 2 deletions base/asyncmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function maptwice(wrapped_f, chnl, worker_tasks, c...)
close(chnl)

# check and throw any exceptions from the worker tasks
foreach(x->(v=wait(x); isa(v, Exception) && throw(v)), worker_tasks)
foreach(x->(v=fetch(x); isa(v, Exception) && throw(v)), worker_tasks)

# check if there was a genuine problem with asyncrun
(asyncrun_excp !== nothing) && throw(asyncrun_excp)
Expand Down Expand Up @@ -327,7 +327,7 @@ function done(itr::AsyncCollector, state::AsyncCollectorState)
close(state.chnl)

# wait for all tasks to finish
foreach(x->(v=wait(x); isa(v, Exception) && throw(v)), state.worker_tasks)
foreach(x->(v=fetch(x); isa(v, Exception) && throw(v)), state.worker_tasks)
empty!(state.worker_tasks)
return true
else
Expand Down
3 changes: 3 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,9 @@ end
# PR #23332
@deprecate ^(x, p::Integer) Base.power_by_squaring(x,p)

# issue #25928
@deprecate wait(t::Task) fetch(t)

# END 0.7 deprecations

# BEGIN 1.0 deprecations
Expand Down
4 changes: 2 additions & 2 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ Block the current task until some event occurs, depending on the type of the arg
* [`Condition`](@ref): Wait for [`notify`](@ref) on a condition.
* `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process
can be used to determine success or failure.
* [`Task`](@ref): Wait for a `Task` to finish, returning its result value. If the task fails
with an exception, the exception is propagated (re-thrown in the task that called `wait`).
* [`Task`](@ref): Wait for a `Task` to finish. If the task fails with an exception, the
exception is propagated (re-thrown in the task that called `wait`).
* `RawFD`: Wait for changes on a file descriptor (see the `FileWatching` package).
If no argument is passed, the task blocks for an undefined period. A task can only be
Expand Down
19 changes: 16 additions & 3 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ function task_local_storage(body::Function, key, val)
end

# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
# TODO: rename to wait for 1.0
function _wait(t::Task)
if !istaskdone(t)
if t.donenotify === nothing
t.donenotify = Condition()
Expand All @@ -183,7 +184,19 @@ function wait(t::Task)
if istaskfailed(t)
throw(t.exception)
end
return task_result(t)
end

_wait(not_a_task) = wait(not_a_task)

"""
fetch(t::Task)
Wait for a Task to finish, then return its result value. If the task fails with an
exception, the exception is propagated (re-thrown in the task that called fetch).
"""
function fetch(t::Task)
_wait(t)
task_result(t)
end

suppress_excp_printing(t::Task) = isa(t.storage, IdDict) ? get(get_task_tls(t), :SUPPRESS_EXCEPTION_PRINTING, false) : false
Expand Down Expand Up @@ -266,7 +279,7 @@ function sync_end()
c_ex = CompositeException()
for r in refs
try
wait(r)
_wait(r)
catch ex
if !isa(r, Task) || (isa(r, Task) && !istaskfailed(r))
rethrow(ex)
Expand Down
2 changes: 1 addition & 1 deletion examples/embedding/embedding-test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ end
stderr = read(stderr, String)
@test stderr == "MethodError: no method matching this_function_has_no_methods()\n"
@test success(p)
lines = wait(stdout_task)
lines = fetch(stdout_task)
@test length(lines) == 10
@test parse(Float64, lines[1]) sqrt(2)
@test lines[8] == "called bar"
Expand Down
4 changes: 2 additions & 2 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ function read_worker_host_port(io::IO)
end
!istaskdone(readtask) && break

conninfo = wait(readtask)
conninfo = fetch(readtask)
if isempty(conninfo) && !isopen(io)
error("Unable to read host:port string from worker. Launch command exited with error?")
end
Expand Down Expand Up @@ -416,7 +416,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
end
end

wait(t_launch) # catches any thrown errors from the launch task
Base._wait(t_launch) # catches any thrown errors from the launch task

# Since all worker-to-worker setups may not have completed by the time this
# function returns to the caller, send the complete list to all workers.
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ function preduce(reducer, f, R)
schedule(t)
push!(w_exec, t)
end
reduce(reducer, [wait(t) for t in w_exec])
reduce(reducer, [fetch(t) for t in w_exec])
end

function pfor(f, R)
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
end
end

for wt in wait_tasks; wait(wt); end
for wt in wait_tasks; Base._wait(wt); end

send_connection_hdr(controller, false)
send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_CORES, getpid()))
Expand Down
8 changes: 4 additions & 4 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ end # full-test

let t = @task 42
schedule(t, ErrorException(""), error=true)
@test_throws ErrorException wait(t)
@test_throws ErrorException Base._wait(t)
end

# issue #8207
Expand All @@ -716,7 +716,7 @@ let t = schedule(@task f13168(100))
yield()
@test t.state == :done
@test_throws ErrorException schedule(t)
@test isa(wait(t),Float64)
@test isa(fetch(t),Float64)
end

# issue #13122
Expand Down Expand Up @@ -1004,7 +1004,7 @@ for i in 1:5
p = addprocs_with_testenv(1)[1]
np = nprocs()
@spawnat p sleep(5)
wait(rmprocs(p; waitfor=0))
Base._wait(rmprocs(p; waitfor=0))
for pid in procs()
@test pid == remotecall_fetch(myid, pid)
end
Expand Down Expand Up @@ -1092,7 +1092,7 @@ for (addp_testf, expected_errstr, env) in testruns
catch ex
redirect_stdout(old_stdout)
close(stdout_in)
@test isempty(wait(stdout_txt))
@test isempty(fetch(stdout_txt))
@test isa(ex, CompositeException)
@test ex.exceptions[1].ex.msg == expected_errstr
end
Expand Down
10 changes: 5 additions & 5 deletions stdlib/FileWatching/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function pfd_tst_reads(idx, intvl)
@test !evt.timedout
@test evt.readable
@test !evt.writable
@test evt === wait(evt2)
@test evt === fetch(evt2)

# println("Expected ", intvl, ", actual ", t_elapsed, ", diff ", t_elapsed - intvl)
# Disabled since this assertion fails randomly, notably on build VMs (issue #12824)
Expand All @@ -63,7 +63,7 @@ function pfd_tst_timeout(idx, intvl)
@test evt.timedout
@test !evt.readable
@test !evt.writable
@test evt === wait(evt2)
@test evt === fetch(evt2)
end

# Disabled since these assertions fail randomly, notably on build VMs (issue #12824)
Expand Down Expand Up @@ -107,7 +107,7 @@ for (i, intvl) in enumerate(intvls)
end
notify(ready_c, all=true)
for idx in 1:n
wait(t[idx])
Base._wait(t[idx])
end
end
end
Expand Down Expand Up @@ -226,7 +226,7 @@ end

function test_watch_file_timeout(tval)
watch = @async watch_file(file, tval)
@test wait(watch) == FileWatching.FileEvent(false, false, true)
@test fetch(watch) == FileWatching.FileEvent(false, false, true)
end

function test_watch_file_change(tval)
Expand All @@ -235,7 +235,7 @@ function test_watch_file_change(tval)
open(file, "a") do f
write(f, "small change\n")
end
@test wait(watch) == FileWatching.FileEvent(false, true, false)
@test fetch(watch) == FileWatching.FileEvent(false, true, false)
end

function test_monitor_wait(tval)
Expand Down
14 changes: 7 additions & 7 deletions stdlib/REPL/test/repl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function fake_repl(f; options::REPL.Options=REPL.Options(confirm_exit=false))
end
@test read(stderr.out, String) == ""
#display(read(stdout.out, String))
wait(t)
Base._wait(t)
nothing
end

Expand Down Expand Up @@ -185,7 +185,7 @@ fake_repl() do stdin_write, stdout_read, repl
redirect_stdout(old_stdout)
end
close(proc_stdout)
@test wait(get_stdout) == "HI\n"
@test fetch(get_stdout) == "HI\n"
end

# Issue #7001
Expand Down Expand Up @@ -249,7 +249,7 @@ fake_repl() do stdin_write, stdout_read, repl

# Close REPL ^D
write(stdin_write, '\x04')
wait(repltask)
Base._wait(repltask)
end

function buffercontents(buf::IOBuffer)
Expand Down Expand Up @@ -618,7 +618,7 @@ fake_repl() do stdin_write, stdout_read, repl

# Close repl
write(stdin_write, '\x04')
wait(repltask)
Base._wait(repltask)
end

# Simple non-standard REPL tests
Expand Down Expand Up @@ -658,7 +658,7 @@ fake_repl() do stdin_write, stdout_read, repl
@test wait(c) == "a"
# Close REPL ^D
write(stdin_write, '\x04')
wait(repltask)
Base._wait(repltask)
end

ccall(:jl_exit_on_sigint, Cvoid, (Cint,), 1)
Expand Down Expand Up @@ -829,7 +829,7 @@ for keys = [altkeys, merge(altkeys...)],

# Close REPL ^D
write(stdin_write, '\x04')
wait(repltask)
Base._wait(repltask)

# Close the history file
# (otherwise trying to delete it fails on Windows)
Expand Down Expand Up @@ -886,7 +886,7 @@ fake_repl() do stdin_write, stdout_read, repl

# Close REPL ^D
write(stdin_write, '\x04')
wait(repltask)
Base._wait(repltask)
end

# Docs.helpmode tests: we test whether the correct expressions are being generated here,
Expand Down
4 changes: 2 additions & 2 deletions stdlib/Serialization/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ eval(Main, main_ex)
create_serialization_stream() do s # user-defined type array
f = () -> begin task_local_storage(:v, 2); return 1+1 end
t = Task(f)
wait(schedule(t))
Base._wait(schedule(t))
serialize(s, t)
seek(s, 0)
r = deserialize(s)
Expand All @@ -349,7 +349,7 @@ end
struct MyErrorTypeTest <: Exception end
create_serialization_stream() do s # user-defined type array
t = Task(()->throw(MyErrorTypeTest()))
@test_throws MyErrorTypeTest wait(schedule(t))
@test_throws MyErrorTypeTest Base._wait(schedule(t))
serialize(s, t)
seek(s, 0)
r = deserialize(s)
Expand Down
8 changes: 4 additions & 4 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ end
redirect_stderr(oldstderr)
close(newstderr[2])
end
wait(t)
Base._wait(t)
@test run[] == 3
@test wait(errstream) == """
@test fetch(errstream) == """
error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer")
error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer")
error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer")
Expand All @@ -266,7 +266,7 @@ end
redirect_stderr(oldstderr)
close(newstderr[2])
end
@test wait(errstream) == "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n"
@test fetch(errstream) == "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n"
end

@testset "schedule_and_wait" begin
Expand All @@ -286,7 +286,7 @@ end
testerr = ErrorException("expected")
@async Base.throwto(t, testerr)
@test try
wait(t)
Base._wait(t)
false
catch ex
ex
Expand Down
4 changes: 2 additions & 2 deletions test/cmdlineargs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ let exename = `$(Base.julia_cmd()) --sysimage-native-code=yes --startup-file=no`
close(out.in)
wait(proc)
@test success(proc)
@test wait(output) == "WARNING: Foo.Deprecated is deprecated, use NotDeprecated instead.\n likely near no file:5"
@test fetch(output) == "WARNING: Foo.Deprecated is deprecated, use NotDeprecated instead.\n likely near no file:5"
end

let out = Pipe(),
Expand All @@ -243,7 +243,7 @@ let exename = `$(Base.julia_cmd()) --sysimage-native-code=yes --startup-file=no`
wait(proc)
close(out.in)
@test success(proc)
@test wait(output) == ""
@test fetch(output) == ""
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/meta.jl
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ let oldout = STDOUT
redirect_stdout(oldout)
close(wrout)

@test wait(out) == """
@test fetch(out) == """
Expr
head: Symbol call
args: Array{Any}((3,))
Expand Down
6 changes: 3 additions & 3 deletions test/misc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ let l = ReentrantLock()
@test false
end === false
end
wait(t)
Base._wait(t)
unlock(l)
@test_throws ErrorException unlock(l)
end
Expand All @@ -127,9 +127,9 @@ end
@noinline function f6597(c)
t = @schedule nothing
finalizer(t -> c[] += 1, t)
wait(t)
Base._wait(t)
@test c[] == 0
wait(t)
Base._wait(t)
nothing
end
let c = Ref(0),
Expand Down
4 changes: 2 additions & 2 deletions test/read.jl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ function cleanup()
end
empty!(open_streams)
for tsk in tasks
wait(tsk)
Base._wait(tsk)
end
empty!(tasks)
end
Expand Down Expand Up @@ -563,7 +563,7 @@ let p = Pipe()
end
@async close(p.in)
end
s = reinterpret(UInt16, wait(t))
s = reinterpret(UInt16, fetch(t))
@test length(s) == 660_000 + typemax(UInt16)
@test s[(end - typemax(UInt16)):end] == UInt16.(0:typemax(UInt16))
end
Loading

0 comments on commit 94197a4

Please sign in to comment.