Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error Handling in Tasks #32677

Closed
rohanmclure opened this issue Jul 25, 2019 · 11 comments
Closed

Error Handling in Tasks #32677

rohanmclure opened this issue Jul 25, 2019 · 11 comments
Labels
error handling Handling of exceptions by Julia or the user

Comments

@rohanmclure
Copy link

rohanmclure commented Jul 25, 2019

Not sure if this is an extension of #10405 or a novel issue, but the following example provides two tasks, one of which will throw an error, the other is blocking. This makes async code difficult to debug.

c = Channel(0)
@sync begin
       @async begin
               put!(c, nothing)
       end
       @async begin
               this_is_not_defined_and_will_error()
               take!(c)
       end
end

Interestingly, reversing the @async blocks will cause the error to propagate properly.

I've verified this is the case in 1.1.0 and 1.3.0-alpha

@vchuravy
Copy link
Member

vchuravy commented Jul 28, 2019

This is likely due to the fact that we wait in order on the tasks, instead of peeking if one has errored.

edit: In Go there is ErrorGroup, https://godoc.org/golang.org/x/sync/errgroup which is close to the behaviour one might want

@c42f
Copy link
Member

c42f commented Aug 9, 2019

go ErrorGroup seems about right — it wraps each function so that a failure will cancel the whole group:

https://github.com/golang/sync/blob/112230192c580c3556b8cee6403af37a4fc5f28c/errgroup/errgroup.go#L54

That seems like a better default behavior for @sync. Maybe we could have some syntax in @sync to set a cancellation policy

@sync cancellation=foo begin
    @async f1()
    @async f2()
end

though I'm not sure what kind of thing foo should be.

@StefanKarpinski
Copy link
Sponsor Member

cc @tkf who has looked into cancellation and Trio-style structured I/O a bit already.

@tkf
Copy link
Member

tkf commented Aug 10, 2019

It looks like the problem here is that the code in the OP is mixing two synchronization paradigms (structured concurrency (@sync-@async) and CSP (unbuffered Channel)) that are not aware of each other. I think this can be solved by attaching Channels (and maybe any kind of synchronization mechanisms) to the @sync block. For example, a very simple minded solution would be to close all Channels attached to a @sync block at the end of it. So you would write something like

@sync begin
    c = @Channel(0)
    ...do stuff...
end

which is expanded to

let tasks = [],  # we call it `sync_varname` now
    channels = []

    ans = begin
        c = Channel(0)
        push!(channels, c)

        ...do stuff...
    end

    Base.sync_end(tasks, channels)

    ans
end

(Or maybe use task_local_storage instead of let? This way, Channel does not have to be a macro.)

In Base.sync_end(tasks, channels), it should not iterate tasks sequentially but rather process the tasks as they complete. Once one of the task fail, it should close all channels.

Of course, closeing channels is not ideal because it makes error handling hard; you have to distinguish errors due to the automatic close in @sync with other ones. It would be nice if there are variants of take!/put! like

maybe_take!(c::Channel{T}) :: Union{Some{T}, Cancelled}
maybe_put!(c::Channel) :: Union{Nothing, Cancelled}

where close makes maybe_*!(c) return a Cancelled immediately. This way, I can write a helper macro (say) @await such that

y = @await maybe_take!(c)

expands to

x = maybe_take!(c)
x isa Cancelled && return x
x = something(y)

This way, you can create a convention that @await-able functions are the ones which return Union{T, Cancelled}. Furthermore, the @await-able functions that check the cancellation tokens associated with a @sync block would be the cancellable functions. This solves my concern in #6283 (comment) about making functions cancellable.

(A more sophisticated version of @await can be based on ResultTypes.jl.)

@tro3
Copy link

tro3 commented Dec 4, 2020

Is this issue on the drawing board for 1.6 or further in the future? I'd be happy to help here over the holidays, if needed.

@StefanKarpinski
Copy link
Sponsor Member

The 1.6 release is branching right about now so no more features. If you want to work on this for 1.7 though, that would be great.

tro3 pushed a commit to tro3/julia that referenced this issue Dec 12, 2020
@tro3
Copy link

tro3 commented Dec 16, 2020

@StefanKarpinski - so I have a fix for this (way simplified from the above), based on parallelizing sync_end. It does not address the larger concurrency issues, though. It just prevents the task-order-based lockup. I've tested against the full suites in the 1.5-release and 1.6-release branches.

Would it make more sense to issue a PR into Experimental, or directly into task.jl? Or is this considered too delicate a PR for a non-core developer (I won't take offense)?

@StefanKarpinski
Copy link
Sponsor Member

I'm not sure about that. @vtjnash or @Keno would have to offer an opinion on the technical aspect.

@tro3
Copy link

tro3 commented Dec 25, 2020

So after wandering down a few different paths and getting feedback on #38916 and #38992, I came to the conclusion that the solution to this problem was (an inferior implementation of) what is already in Experimental.sync_end. So I'm just going to point to that in my startup.jl until it moves to Base, which IMO it should do. Happy to do the PR for this and add a lockup test suite if needed.

@User-764Q
Copy link

Hi this is a bit over my head, but I tried it both ways round today on Julia 1.6.2 and it errored both times.

`
julia> @sync begin
@async begin
take!(c)
end
@async begin

                  put!(c, nothing)
          end
   end

ERROR: TaskFailedException

nested task error: UndefVarError: c not defined
Stacktrace:
 [1] macro expansion
   @ ./REPL[1]:3 [inlined]
 [2] (::var"#5#7")()
   @ Main ./task.jl:411

...and 1 more exception.

Stacktrace:
[1] sync_end(c::Channel{Any})
@ Base ./task.jl:369
[2] top-level scope
@ task.jl:388

`

`

julia> @sync begin
@async begin
put!(c, nothing)
end
@async begin

                  take!(c)
          end
   end

ERROR: TaskFailedException

nested task error: UndefVarError: c not defined
Stacktrace:
 [1] macro expansion
   @ ./REPL[1]:3 [inlined]
 [2] (::var"#1#3")()
   @ Main ./task.jl:411

...and 1 more exception.

Stacktrace:
[1] sync_end(c::Channel{Any})
@ Base ./task.jl:369
[2] top-level scope
@ task.jl:388

`

@vtjnash
Copy link
Sponsor Member

vtjnash commented Feb 3, 2024

Seems this is now Experimental.sync_end, which we should move to Future.sync_end, and eventually Base.sync_end

@vtjnash vtjnash closed this as completed Feb 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
error handling Handling of exceptions by Julia or the user
Projects
None yet
Development

No branches or pull requests

9 participants