Skip to content

Commit

Permalink
refactor asynchronous addition of workers
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jun 10, 2015
1 parent 057f573 commit e554039
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 299 deletions.
24 changes: 24 additions & 0 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,27 @@ function unlock(rl::ReentrantLock)
end
return rl
end

type Semaphore
sem_size::Int
curr_cnt::Int
cond_wait::Condition
Semaphore(sem_size) = new(sem_size, 0, Condition())
end

function acquire(s::Semaphore)
while true
if s.curr_cnt < s.sem_size
s.curr_cnt = s.curr_cnt + 1
return
else
wait(s.cond_wait)
end
end
end

function release(s::Semaphore)
s.curr_cnt = s.curr_cnt - 1
notify(s.cond_wait; all=false)
end

14 changes: 13 additions & 1 deletion base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ end
immutable DefaultClusterManager <: ClusterManager
end

const tunnel_hosts_map = Dict{AbstractString, Semaphore}()

function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
if !isnull(config.connect_at)
# this is a worker-to-worker setup call.
Expand Down Expand Up @@ -245,8 +247,18 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
end

if tunnel
if !haskey(tunnel_hosts_map, pubhost)
tunnel_hosts_map[pubhost] = Semaphore(get(config.max_parallel, typemax(Int)))
end
sem = tunnel_hosts_map[pubhost]

sshflags = get(config.sshflags)
(s, bind_addr) = connect_to_worker(pubhost, bind_addr, port, user, sshflags)
acquire(sem)
try
(s, bind_addr) = connect_to_worker(pubhost, bind_addr, port, user, sshflags)
finally
release(sem)
end
else
(s, bind_addr) = connect_to_worker(bind_addr, port)
end
Expand Down
Loading

0 comments on commit e554039

Please sign in to comment.