Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed.jl bug: ConcurrencyViolationError("lock must be held") #478

Open
schlichtanders opened this issue Mar 5, 2024 · 3 comments
Open
Labels

Comments

@schlichtanders
Copy link

schlichtanders commented Mar 5, 2024

When constructing an easy but kind of complete introductory example, I am running into ConcurrencyViolationError("lock must be held"), while the official nested loop example which I adapted it from explicitly said that Dagger does not need locks.

using Dagger: @spawn
using Distributed
# add two further julia processes which could run on other machines
addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
@everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
Dagger.all_processors()

# let's distributes some calculations
aggregators = [Mean, Variance, Extrema] 
# @sync waits until all enclosed calls to @spawn are ready
df = DataFrame()
@sync for i in 1:1000
    data = @spawn rand(10000)
    for agg in aggregators
        res = @spawn fit!(agg(), data)
        push!(df, (i=i, aggregator=nameof(agg), result=res))
    end
end

df.result .= fetch.(df.result)   
The full stacktrace
julia> using Dagger: @spawn

julia> using Distributed
# add two further julia processes which could run on other machines
WARNING: using Distributed.@spawn in module Main conflicts with an existing identifier.

julia> addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
2-element Vector{Int64}:
2
3

julia> @everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes

julia> Dagger.all_processors()

# let's distributes some calculations
Set{Dagger.Processor} with 5 elements:
Dagger.ThreadProc(2, 1)
Dagger.ThreadProc(3, 1)
Dagger.ThreadProc(1, 1)
Dagger.ThreadProc(3, 2)
Dagger.ThreadProc(2, 2)

julia> aggregators = [Mean, Variance, Extrema]
# @sync waits until all enclosed calls to @spawn are ready
3-element Vector{UnionAll}:
Mean
Variance
Extrema

julia> df = DataFrame()
0×0 DataFrame

julia> @sync for i in 1:1000
data = @spawn rand(10000)
for agg in aggregators
res = @spawn fit!(agg(), data)
push!(df, (i=i, aggregator=nameof(agg), result=res))
end
end

julia> df.result .= fetch.(df.result)
ERROR: ThunkFailedException:
Root Exception Type: CapturedException
Root Exception:
ConcurrencyViolationError("lock must be held")
Stacktrace:
[1] concurrency_violation
@ ./condition.jl:8
[2] assert_havelock
@ ./condition.jl:25 [inlined]
[3] assert_havelock
@ ./condition.jl:48 [inlined]
[4] assert_havelock
@ ./condition.jl:72 [inlined]
[5] _wait2
@ ./condition.jl:83
[6] #wait#645
@ ./condition.jl:127
[7] wait
@ ./condition.jl:125 [inlined]
[8] wait_for_conn
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:195
[9] check_worker_state
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:170
[10] send_msg_
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:172
[11] send_msg
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:122 [inlined]
[12] #remotecall_fetch#159
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:460
[13] remotecall_fetch
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:454
[14] remotecall_fetch
@ ~/.julia/juliaup/julia-1.10.1+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:492 [inlined]
[15] #181
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:490 [inlined]
[16] forwardkeyerror
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:475
[17] poolget
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:489
[18] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:98
[19] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:96 [inlined]
[20] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:102
[21] #invokelatest#2
@ ./essentials.jl:892 [inlined]
[22] invokelatest
@ ./essentials.jl:889 [inlined]
[23] #166
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1553
Stacktrace:
[1] wait
@ ./task.jl:352 [inlined]
[2] fetch
@ ./task.jl:372 [inlined]
[3] fetch_report
@ ~/.julia/packages/Dagger/Tx54v/src/sch/util.jl:263
[4] do_task
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1562
[5] #143
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1302
This Thunk:  Thunk(id=212, fit!(Extrema: n=0 | value=(min = Inf, max = -Inf, nmin = 0, nmax = 0), Thunk[209](rand, ...)))
Stacktrace:
[1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
@ Dagger ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:16
[2] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:11 [inlined]
[3] #fetch#73
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:58 [inlined]
[4] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:54 [inlined]
[5] _broadcast_getindex_evalf
@ ./broadcast.jl:709 [inlined]
[6] _broadcast_getindex
@ ./broadcast.jl:682 [inlined]
[7] getindex
@ ./broadcast.jl:636 [inlined]
[8] copyto_nonleaf!(dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, iter::Base.OneTo{…}, state::Int64, count::Int64)
@ Base.Broadcast ./broadcast.jl:1098
[9] restart_copyto_nonleaf!(newdest::Vector{…}, dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, val::Variance{…}, I::Int64,iter::Base.OneTo{…}, state::Int64, count::Int64)
@ Base.Broadcast ./broadcast.jl:1089
[10] copyto_nonleaf!(dest::Vector{…}, bc::Base.Broadcast.Broadcasted{…}, iter::Base.OneTo{…}, state::Int64, count::Int64)
@ Base.Broadcast ./broadcast.jl:1105
[11] copy
@ ./broadcast.jl:950 [inlined]
[12] materialize
@ ./broadcast.jl:903 [inlined]
[13] copyto!(lazydf::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ DataFrames ~/.julia/packages/DataFrames/58MUJ/src/other/broadcasting.jl:207
[14] materialize!
@ ./broadcast.jl:914 [inlined]
[15] materialize!(dest::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ Base.Broadcast ./broadcast.jl:911
[16] top-level scope
@ REPL[9]:1
Some type information was truncated. Use `show(err)` to see complete types.

Tried on Julia 1.10.1 and 1.10.2 with Dagger 0.18.8

@schlichtanders
Copy link
Author

This might be because of missing thread-safety in Distributed.jl

@schlichtanders
Copy link
Author

I now build explicit locking into the system, because I thought the issue was about accessing the data...

using Dagger: @spawn, @shard
using Distributed
# add two further julia processes which could run on other machines
addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
@everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
Dagger.all_processors()

# let's distributes some calculations
aggregators = [Mean, Variance, Extrema] 
df = DataFrame()

@everywhere function myfit!(lck, agg, data)
    lock(() -> fit!(agg(), data), lck)
end
# @sync waits until all enclosed calls to @spawn are ready
@sync for i in 1:1000
    data = @spawn rand(10000)
    # This creates a lock per worker. If the task is run on
    # a worker, the correct lock is automatically picked up.
    # Needed for multi-threading access to data.
    lck = @shard ReentrantLock()
    for agg in aggregators
        res = @spawn myfit!(lck, agg, data)
        push!(df, (i=i, aggregator=nameof(agg), result=res))
    end
end

df.result .= fetch.(df.result)   

I get a similar ConcurrencyViolationError("lock must be held") error

julia> df.result .= fetch.(df.result)
ERROR: ThunkFailedException:
Root Exception Type: CapturedException
Root Exception:
ConcurrencyViolationError("lock must be held")
Stacktrace:
[1] concurrency_violation
@ ./condition.jl:8
[2] assert_havelock
@ ./condition.jl:25 [inlined]
[3] assert_havelock
@ ./condition.jl:48 [inlined]
[4] assert_havelock
@ ./condition.jl:72 [inlined]
[5] _wait2
@ ./condition.jl:83
[6] #wait#645
@ ./condition.jl:127
[7] wait
@ ./condition.jl:125 [inlined]
[8] wait_for_conn
@ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:195
[9] check_worker_state
@ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/cluster.jl:170
[10] send_msg_
@ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:172
[11] send_msg
@ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/messages.jl:122 [inlined]
[12] #remotecall_fetch#159
@ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:460
[13] remotecall_fetch
@ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:454
[14] remotecall_fetch
@ ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Distributed/src/remotecall.jl:492 [inlined]
[15] #181
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:490 [inlined]
[16] forwardkeyerror
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:475
[17] poolget
@ ~/.julia/packages/MemPool/dgBSi/src/datastore.jl:489
[18] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:98
[19] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:96 [inlined]
[20] move
@ ~/.julia/packages/Dagger/Tx54v/src/chunks.jl:102
[21] #invokelatest#2
@ ./essentials.jl:892 [inlined]
[22] invokelatest
@ ./essentials.jl:889 [inlined]
[23] #166
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1553
Stacktrace:
[1] wait
@ ./task.jl:352 [inlined]
[2] fetch
@ ./task.jl:372 [inlined]
[3] fetch_report
@ ~/.julia/packages/Dagger/Tx54v/src/sch/util.jl:263
[4] do_task
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1562
[5] #143
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1302
This Thunk:  Thunk(id=9, myfit!(Dagger.Shard(Dict{Dagger.Processor, Dagger.Chunk}(OSProc(1) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(1, 6, 0x0000000000000060), OSProc(1), ProcessScope: worker == 1, false), OSProc(2) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(2, 0, 0x0000000000000060), OSProc(2), ProcessScope: worker == 2, false), OSProc(3) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(3, 1, 0x0000000000000060), OSProc(3), ProcessScope: worker== 3, false))), Mean, Thunk[5](rand, ...)))
Stacktrace:
[1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
@ Dagger ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:16
[2] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:11 [inlined]
[3] #fetch#73
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:58 [inlined]
[4] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:54 [inlined]
[5] _broadcast_getindex_evalf
@ ./broadcast.jl:709 [inlined]
[6] _broadcast_getindex
@ ./broadcast.jl:682 [inlined]
[7] getindex
@ ./broadcast.jl:636 [inlined]
[8] copy
@ ./broadcast.jl:942 [inlined]
[9] materialize
@ ./broadcast.jl:903 [inlined]
[10] copyto!(lazydf::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ DataFrames ~/.julia/packages/DataFrames/58MUJ/src/other/broadcasting.jl:207
[11] materialize!
@ ./broadcast.jl:914 [inlined]
[12] materialize!(dest::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ Base.Broadcast ./broadcast.jl:911
[13] top-level scope
@ REPL[10]:1
Some type information was truncated. Use `show(err)` to see complete types.

@jpsamaroo
Copy link
Member

Yeah, fixing this will require using Julia 1.11 and adding JuliaLang/Distributed.jl#4 to your project, to get the appropriate fixes into Distributed. It's very much Distributed's bug and not Dagger's, but I will keep this issue open anyway in case anyone else stumbles upon this.

@jpsamaroo jpsamaroo changed the title ConcurrencyViolationError("lock must be held") Distributed.jl bug: ConcurrencyViolationError("lock must be held") Mar 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants