Skip to content

Commit

Permalink
Merge pull request #9434 from amitmurthy/amitm/custom_connect
Browse files Browse the repository at this point in the history
user defined transports
  • Loading branch information
amitmurthy committed Jan 22, 2015
2 parents cd0ff78 + a2edd64 commit 7fbea9d
Show file tree
Hide file tree
Showing 18 changed files with 617 additions and 17 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ $(build_docdir):
@mkdir -p $@/examples
@cp -R doc/devdocs doc/manual doc/stdlib $@
@cp -R examples/*.jl $@/examples/
@cp -R examples/clustermanager $@/examples/

git-submodules:
ifneq ($(NO_GIT), 1)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ Library improvements

* Split `Triangular` type into `UpperTriangular`, `LowerTriangular`, `UnitUpperTriagular` and `UnitLowerTriangular` ([#9779])

* ClusterManager - Performance improvements([#9309]) and support for changing transports([#9434])

Deprecated or removed
---------------------

Expand Down
11 changes: 9 additions & 2 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,15 @@ function process_options(args::Vector{UTF8String})
if args[i]=="-q" || args[i]=="--quiet"
quiet = true
elseif args[i]=="--worker"
start_worker()
# doesn't return
worker_arg = (i == length(args)) ? "" : args[i+1]

if worker_arg == "custom"
i += 1
else
start_worker()
# doesn't return
end

elseif args[i]=="--bind-to"
i+=1 # has already been processed
elseif args[i]=="-e" || args[i]=="--eval"
Expand Down
2 changes: 2 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export
BitArray,
BitMatrix,
BitVector,
BufferStream,
CartesianIndex,
CartesianRange,
CFILE,
Expand Down Expand Up @@ -1218,6 +1219,7 @@ export
nprocs,
nworkers,
pmap,
process_messages,
procs,
put!,
remotecall,
Expand Down
9 changes: 8 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ function rmprocs(args...; waitfor = 0.0)
else
if haskey(map_pid_wrkr, i)
push!(rmprocset, i)
remote_do(i, exit)
w = map_pid_wrkr[i]
kill(w.manager, i, w.config)
end
end
end
Expand Down Expand Up @@ -1286,6 +1287,12 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
(s, s)
end

function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(pid, exit) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
nothing
end

function connect_w2w(pid::Int, config::WorkerConfig)
(rhost, rport) = get(config.connect_at)
config.host = rhost
Expand Down
44 changes: 44 additions & 0 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -960,3 +960,47 @@ mark(x::AsyncStream) = mark(x.buffer)
unmark(x::AsyncStream) = unmark(x.buffer)
reset(x::AsyncStream) = reset(x.buffer)
ismarked(x::AsyncStream) = ismarked(x.buffer)

# BufferStream's are non-OS streams, backed by a regular IOBuffer
type BufferStream <: AsyncStream
buffer::IOBuffer
r_c::Condition
close_c::Condition
is_open::Bool

BufferStream() = new(PipeBuffer(), Condition(), Condition(), true)
end

isopen(s::BufferStream) = s.is_open
close(s::BufferStream) = (s.is_open = false; notify(s.r_c; all=true); notify(s.close_c; all=true); nothing)

function wait_readnb(s::BufferStream, nb::Int)
while isopen(s) && nb_available(s.buffer) < nb
wait(s.r_c)
end

(nb_available(s.buffer) < nb) && error("closed BufferStream")
end

function eof(s::BufferStream)
wait_readnb(s,1)
!isopen(s) && nb_available(s.buffer)<=0
end

show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open)

nb_available(s::BufferStream) = nb_available(s.buffer)

function wait_readbyte(s::BufferStream, c::UInt8)
while isopen(s) && search(s.buffer,c) <= 0
wait(s.r_c)
end
end

wait_close(s::BufferStream) = if isopen(s) wait(s.close_c); end
start_reading(s::BufferStream) = nothing

write(s::BufferStream, b::UInt8) = (rv=write(s.buffer, b); notify(s.r_c; all=true);rv)
write{T}(s::BufferStream, a::Array{T}) = (rv=write(s.buffer, a); notify(s.r_c; all=true);rv)
write(s::BufferStream, p::Ptr, nb::Integer) = (rv=write(s.buffer, p, nb); notify(s.r_c; all=true);rv)

105 changes: 92 additions & 13 deletions doc/manual/parallel-computing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -674,21 +674,46 @@ retained.
ClusterManagers
---------------

Julia worker processes can also be spawned on arbitrary machines,
enabling Julia's natural parallelism to function quite transparently
in a cluster environment. The :class:`ClusterManager` interface provides a
way to specify a means to launch and manage worker processes.
The launching, management and networking of julia processes into a logical
cluster is done via cluster managers. A ``ClusterManager`` is responsible for

Thus, a custom cluster manager would need to:
- launching worker processes in a cluster environment
- managing events during the lifetime of each worker
- optionally, a cluster manager can also provide data transport

- be a subtype of the abstract :class:`ClusterManager`
- implement :func:`launch`, a method responsible for launching new workers
- implement :func:`manage`, which is called at various events during a worker's lifetime
A julia cluster has the following characteristics:
- The initial julia process, also called the ``master`` is special and has a julia id of 1.
- Only the ``master`` process can add or remove worker processes.
- All processes can directly communicate with each other.

Connections between workers (using the in-built TCP/IP transport) is established in the following manner:
- ``addprocs`` is called on the master process with a ``ClusterManager`` object
- ``addprocs`` calls the appropriate ``launch`` method which spawns required
number of worker processes on appropriate machines
- Each worker starts listening on a free port and writes out its host, port information to STDOUT
- The cluster manager captures the stdout's of each worker and makes it available to the master process
- The master process parses this information and sets up TCP/IP connections to each worker
- Every worker is also notified of other workers in the cluster
- Each worker connects to all workers whose julia id is less than its own id
- In this way a mesh network is established, wherein every worker is directly connected with every other worker


While the default transport layer uses plain TCP sockets, it is possible for a julia cluster to provide
its own transport.

Julia provides two in-built cluster managers:

- :class:`LocalManager`, used when :func:`addprocs` or :func:`addprocs(::Integer) <addprocs>` are called
- :class:`SSHManager`, used when :func:`addprocs(::Array) <addprocs>` is called with a list of hostnames
- ``LocalManager``, used when :func:`addprocs` or :func:`addprocs(np::Integer) <addprocs>` are called
- ``SSHManager``, used when :func:`addprocs(hostnames::Array) <addprocs>` is called with a list of hostnames

:class:`LocalManager` is used to launch additional workers on the same host, thereby leveraging multi-core
and multi-processor hardware.

Thus, a minimal cluster manager would need to:

- be a subtype of the abstract :class:`ClusterManager`
- implement :func:`launch`, a method responsible for launching new workers
- implement :func:`manage`, which is called at various events during a worker's lifetime

:func:`addprocs(manager::FooManager) <addprocs>` requires ``FooManager`` to implement::

Expand Down Expand Up @@ -730,8 +755,8 @@ argument. Optionally ``--bind-to bind_addr[:port]`` may also be specified to ena
to connect to it at the specified ``bind_addr`` and ``port``. Useful for multi-homed hosts.


For every worker launched, the :func:`launch` method must add a :clas`WorkerConfig`
object with appropriate fields initialized to ``launched`` ::
For every worker launched, the :func:`launch` method must add a :class:`WorkerConfig`
object (with appropriate fields initialized) to ``launched`` ::

type WorkerConfig
# Common fields relevant to all cluster managers
Expand All @@ -753,6 +778,8 @@ object with appropriate fields initialized to ``launched`` ::
sshflags::Nullable{Cmd}
max_parallel::Nullable{Integer}

connect_at::Nullable{Any}

.....
end

Expand All @@ -778,7 +805,6 @@ required to connect to the workers from the master process.
``userdata`` is provided for custom cluster managers to store their own worker specific information.



:func:`manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) <manage>` is called at different
times during the worker's lifetime with different ``op`` values:

Expand All @@ -789,6 +815,59 @@ times during the worker's lifetime with different ``op`` values:
interrupt signal.
- with ``:finalize`` for cleanup purposes.


Cluster Managers with custom transports
---------------------------------------

Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a little more involved.
Each julia process has as many communication tasks as the workers it is connected to. For example, consider a julia cluster of
32 processes in a all-to-all mesh network:

- Each julia process thus has 31 communication tasks
- Each task handles all incoming messages from a single remote worker in a message processing loop
- The message processing loop waits on an ``AsyncStream`` object - for example, a TCP socket in the default implementation, reads an entire
message, processes it and waits for the next one
- Sending messages to a process is done directly from any julia task - not just communication tasks - again, via the appropriate
``AsyncStream`` object

Replacing the default transport involves the new implementation to setup connections to remote workers, and to provide appropriate
``AsyncStream`` objects that the message processing loops can wait on. The manager specific callbacks to be implemented are::

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

The default implementation (which uses TCP/IP sockets) is implemented as ``connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)``.

``connect`` should return a pair of ``AsyncStream`` objects, one for reading data sent from worker ``pid``,
and the other to write data that needs to be sent to worker ``pid``. Custom cluster managers can use an in-memory ``BufferStream``
as the plumbing to proxy data between the custom, possibly non-AsyncStream transport and julia's in-built parallel infrastructure.

A ``BufferStream`` is an in-memory ``IOBuffer`` which behaves like an ``AsyncStream``.

Folder ``examples/clustermanager/0mq`` is an example of using ZeroMQ is connect julia workers in a star network with a 0MQ broker in the middle.
Note: The julia processes are still all *logically* connected to each other - any worker can message any other worker directly without any
awareness of 0MQ being used as the transport layer.

When using custom transports:
- julia workers must be started with arguments ``--worker custom``. Just ``--worker`` will result in the newly launched
workers defaulting to the socket transport implementation
- For every logical connection with a worker, :func:`process_messages(rd::AsyncStream, wr::AsyncStream)` must be called
This launches a new task that handles reading and writing of messages from/to the worker represented by the ``AsyncStream`` objects
- :func:`init_worker(manager::FooManager)` must be called as part of worker process initializaton
- Field ``connect_at::Any`` in :class:`WorkerConfig` can be set by the cluster manager when ``launch`` is called. The value of
this field is passed in in all ``connect`` callbacks. Typically, it carries information on *how to connect* to a worker. For example,
the TCP/IP socket transport uses this field to specify the ``(host, port)`` tuple at which to connect to a worker


``kill(manager, pid, config)`` is called to remove a worker from the cluster.
On the master process, the corresponding ``AsyncStream`` objects must be closed by the implementation to ensure proper cleanup. The default
implementation simply executes an ``exit()`` call on the specified remote worker.

``examples/clustermanager/simple`` is an example that shows a simple implementation using unix domain sockets for cluster setup




.. rubric:: Footnotes

.. [#mpi2rma] In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding RMA to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see http:https://www.mpi-forum.org/docs.
2 changes: 1 addition & 1 deletion doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ General Parallel Computing Support
.. function:: addprocs(n::Integer; exeflags=``) -> List of process identifiers

Launches workers using the in-built ``LocalManager`` which only launches workers on the local host.
This can be used to take advantage of multiple cores. `addprocs(4)`` will add 4 processes on the local machine.
This can be used to take advantage of multiple cores. ``addprocs(4)`` will add 4 processes on the local machine.

.. function:: addprocs() -> List of process identifiers

Expand Down
27 changes: 27 additions & 0 deletions examples/clustermanager/0mq/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
This is a proof-of-concept that uses ZeroMQ as transport.
It uses a star topology as opposed to the native mesh network.

Package ZMQ must be installed. All workers only run on localhost.

All Julia nodes only connect to a "broker" process that listens on known ports
8100 and 8101 via ZMQ sockets.


All commands must be run from `examples/clustermanager/0mq` directory

First, start the broker. In a new console type:
julia broker.jl

This does not return.

Next, start a Julia REPL and type:
include("ZMQCM.jl")
ZMQCM.start_master(4) # start with four workers


Alternatively, head.jl, a test script could be run. It just launches the requested number of workers,
executes a simple command on all of them and exits.
julia head.jl 4

NOTE: As stated this is a proof-of-concept. A real Julia cluster using ZMQ will probably use
different ZMQ socket types and optimize the transport.
Loading

0 comments on commit 7fbea9d

Please sign in to comment.