diff --git a/Makefile b/Makefile index 337f7fc13a5f2..f9c339e02d29b 100644 --- a/Makefile +++ b/Makefile @@ -37,6 +37,7 @@ $(build_docdir): @mkdir -p $@/examples @cp -R doc/devdocs doc/manual doc/stdlib $@ @cp -R examples/*.jl $@/examples/ + @cp -R examples/clustermanager $@/examples/ git-submodules: ifneq ($(NO_GIT), 1) diff --git a/NEWS.md b/NEWS.md index 3e4a15ceaf01b..4a72139a7f81b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -156,6 +156,8 @@ Library improvements * Split `Triangular` type into `UpperTriangular`, `LowerTriangular`, `UnitUpperTriagular` and `UnitLowerTriangular` ([#9779]) + * ClusterManager - Performance improvements([#9309]) and support for changing transports([#9434]) + Deprecated or removed --------------------- diff --git a/base/client.jl b/base/client.jl index 3faa2640f647f..e76118ed7a191 100644 --- a/base/client.jl +++ b/base/client.jl @@ -222,8 +222,15 @@ function process_options(args::Vector{UTF8String}) if args[i]=="-q" || args[i]=="--quiet" quiet = true elseif args[i]=="--worker" - start_worker() - # doesn't return + worker_arg = (i == length(args)) ? "" : args[i+1] + + if worker_arg == "custom" + i += 1 + else + start_worker() + # doesn't return + end + elseif args[i]=="--bind-to" i+=1 # has already been processed elseif args[i]=="-e" || args[i]=="--eval" diff --git a/base/exports.jl b/base/exports.jl index df72a4e05f70a..e9a943c4da04a 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -30,6 +30,7 @@ export BitArray, BitMatrix, BitVector, + BufferStream, CartesianIndex, CartesianRange, CFILE, @@ -1218,6 +1219,7 @@ export nprocs, nworkers, pmap, + process_messages, procs, put!, remotecall, diff --git a/base/multi.jl b/base/multi.jl index adca59f51de42..b0c3d53959d29 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -303,7 +303,8 @@ function rmprocs(args...; waitfor = 0.0) else if haskey(map_pid_wrkr, i) push!(rmprocset, i) - remote_do(i, exit) + w = map_pid_wrkr[i] + kill(w.manager, i, w.config) end end end @@ -1286,6 +1287,12 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) (s, s) end +function kill(manager::ClusterManager, pid::Int, config::WorkerConfig) + remote_do(pid, exit) # For TCP based transports this will result in a close of the socket + # at our end, which will result in a cleanup of the worker. + nothing +end + function connect_w2w(pid::Int, config::WorkerConfig) (rhost, rport) = get(config.connect_at) config.host = rhost diff --git a/base/stream.jl b/base/stream.jl index 8ea863cd38afd..fa497998ab5e6 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -960,3 +960,47 @@ mark(x::AsyncStream) = mark(x.buffer) unmark(x::AsyncStream) = unmark(x.buffer) reset(x::AsyncStream) = reset(x.buffer) ismarked(x::AsyncStream) = ismarked(x.buffer) + +# BufferStream's are non-OS streams, backed by a regular IOBuffer +type BufferStream <: AsyncStream + buffer::IOBuffer + r_c::Condition + close_c::Condition + is_open::Bool + + BufferStream() = new(PipeBuffer(), Condition(), Condition(), true) +end + +isopen(s::BufferStream) = s.is_open +close(s::BufferStream) = (s.is_open = false; notify(s.r_c; all=true); notify(s.close_c; all=true); nothing) + +function wait_readnb(s::BufferStream, nb::Int) + while isopen(s) && nb_available(s.buffer) < nb + wait(s.r_c) + end + + (nb_available(s.buffer) < nb) && error("closed BufferStream") +end + +function eof(s::BufferStream) + wait_readnb(s,1) + !isopen(s) && nb_available(s.buffer)<=0 +end + +show(io::IO, s::BufferStream) = print(io,"BufferStream() bytes waiting:",nb_available(s.buffer),", isopen:", s.is_open) + +nb_available(s::BufferStream) = nb_available(s.buffer) + +function wait_readbyte(s::BufferStream, c::UInt8) + while isopen(s) && search(s.buffer,c) <= 0 + wait(s.r_c) + end +end + +wait_close(s::BufferStream) = if isopen(s) wait(s.close_c); end +start_reading(s::BufferStream) = nothing + +write(s::BufferStream, b::UInt8) = (rv=write(s.buffer, b); notify(s.r_c; all=true);rv) +write{T}(s::BufferStream, a::Array{T}) = (rv=write(s.buffer, a); notify(s.r_c; all=true);rv) +write(s::BufferStream, p::Ptr, nb::Integer) = (rv=write(s.buffer, p, nb); notify(s.r_c; all=true);rv) + diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index 522d71f78abbb..bd36495753414 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -674,21 +674,46 @@ retained. ClusterManagers --------------- -Julia worker processes can also be spawned on arbitrary machines, -enabling Julia's natural parallelism to function quite transparently -in a cluster environment. The :class:`ClusterManager` interface provides a -way to specify a means to launch and manage worker processes. +The launching, management and networking of julia processes into a logical +cluster is done via cluster managers. A ``ClusterManager`` is responsible for -Thus, a custom cluster manager would need to: +- launching worker processes in a cluster environment +- managing events during the lifetime of each worker +- optionally, a cluster manager can also provide data transport -- be a subtype of the abstract :class:`ClusterManager` -- implement :func:`launch`, a method responsible for launching new workers -- implement :func:`manage`, which is called at various events during a worker's lifetime +A julia cluster has the following characteristics: +- The initial julia process, also called the ``master`` is special and has a julia id of 1. +- Only the ``master`` process can add or remove worker processes. +- All processes can directly communicate with each other. + +Connections between workers (using the in-built TCP/IP transport) is established in the following manner: +- ``addprocs`` is called on the master process with a ``ClusterManager`` object +- ``addprocs`` calls the appropriate ``launch`` method which spawns required + number of worker processes on appropriate machines +- Each worker starts listening on a free port and writes out its host, port information to STDOUT +- The cluster manager captures the stdout's of each worker and makes it available to the master process +- The master process parses this information and sets up TCP/IP connections to each worker +- Every worker is also notified of other workers in the cluster +- Each worker connects to all workers whose julia id is less than its own id +- In this way a mesh network is established, wherein every worker is directly connected with every other worker + + +While the default transport layer uses plain TCP sockets, it is possible for a julia cluster to provide +its own transport. Julia provides two in-built cluster managers: -- :class:`LocalManager`, used when :func:`addprocs` or :func:`addprocs(::Integer) ` are called -- :class:`SSHManager`, used when :func:`addprocs(::Array) ` is called with a list of hostnames +- ``LocalManager``, used when :func:`addprocs` or :func:`addprocs(np::Integer) ` are called +- ``SSHManager``, used when :func:`addprocs(hostnames::Array) ` is called with a list of hostnames + +:class:`LocalManager` is used to launch additional workers on the same host, thereby leveraging multi-core +and multi-processor hardware. + +Thus, a minimal cluster manager would need to: + +- be a subtype of the abstract :class:`ClusterManager` +- implement :func:`launch`, a method responsible for launching new workers +- implement :func:`manage`, which is called at various events during a worker's lifetime :func:`addprocs(manager::FooManager) ` requires ``FooManager`` to implement:: @@ -730,8 +755,8 @@ argument. Optionally ``--bind-to bind_addr[:port]`` may also be specified to ena to connect to it at the specified ``bind_addr`` and ``port``. Useful for multi-homed hosts. -For every worker launched, the :func:`launch` method must add a :clas`WorkerConfig` -object with appropriate fields initialized to ``launched`` :: +For every worker launched, the :func:`launch` method must add a :class:`WorkerConfig` +object (with appropriate fields initialized) to ``launched`` :: type WorkerConfig # Common fields relevant to all cluster managers @@ -753,6 +778,8 @@ object with appropriate fields initialized to ``launched`` :: sshflags::Nullable{Cmd} max_parallel::Nullable{Integer} + connect_at::Nullable{Any} + ..... end @@ -778,7 +805,6 @@ required to connect to the workers from the master process. ``userdata`` is provided for custom cluster managers to store their own worker specific information. - :func:`manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) ` is called at different times during the worker's lifetime with different ``op`` values: @@ -789,6 +815,59 @@ times during the worker's lifetime with different ``op`` values: interrupt signal. - with ``:finalize`` for cleanup purposes. + +Cluster Managers with custom transports +--------------------------------------- + +Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a little more involved. +Each julia process has as many communication tasks as the workers it is connected to. For example, consider a julia cluster of +32 processes in a all-to-all mesh network: + + - Each julia process thus has 31 communication tasks + - Each task handles all incoming messages from a single remote worker in a message processing loop + - The message processing loop waits on an ``AsyncStream`` object - for example, a TCP socket in the default implementation, reads an entire + message, processes it and waits for the next one + - Sending messages to a process is done directly from any julia task - not just communication tasks - again, via the appropriate + ``AsyncStream`` object + +Replacing the default transport involves the new implementation to setup connections to remote workers, and to provide appropriate +``AsyncStream`` objects that the message processing loops can wait on. The manager specific callbacks to be implemented are:: + + connect(manager::FooManager, pid::Integer, config::WorkerConfig) + kill(manager::FooManager, pid::Int, config::WorkerConfig) + +The default implementation (which uses TCP/IP sockets) is implemented as ``connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)``. + +``connect`` should return a pair of ``AsyncStream`` objects, one for reading data sent from worker ``pid``, +and the other to write data that needs to be sent to worker ``pid``. Custom cluster managers can use an in-memory ``BufferStream`` +as the plumbing to proxy data between the custom, possibly non-AsyncStream transport and julia's in-built parallel infrastructure. + +A ``BufferStream`` is an in-memory ``IOBuffer`` which behaves like an ``AsyncStream``. + +Folder ``examples/clustermanager/0mq`` is an example of using ZeroMQ is connect julia workers in a star network with a 0MQ broker in the middle. +Note: The julia processes are still all *logically* connected to each other - any worker can message any other worker directly without any +awareness of 0MQ being used as the transport layer. + +When using custom transports: + - julia workers must be started with arguments ``--worker custom``. Just ``--worker`` will result in the newly launched + workers defaulting to the socket transport implementation + - For every logical connection with a worker, :func:`process_messages(rd::AsyncStream, wr::AsyncStream)` must be called + This launches a new task that handles reading and writing of messages from/to the worker represented by the ``AsyncStream`` objects + - :func:`init_worker(manager::FooManager)` must be called as part of worker process initializaton + - Field ``connect_at::Any`` in :class:`WorkerConfig` can be set by the cluster manager when ``launch`` is called. The value of + this field is passed in in all ``connect`` callbacks. Typically, it carries information on *how to connect* to a worker. For example, + the TCP/IP socket transport uses this field to specify the ``(host, port)`` tuple at which to connect to a worker + + +``kill(manager, pid, config)`` is called to remove a worker from the cluster. +On the master process, the corresponding ``AsyncStream`` objects must be closed by the implementation to ensure proper cleanup. The default +implementation simply executes an ``exit()`` call on the specified remote worker. + +``examples/clustermanager/simple`` is an example that shows a simple implementation using unix domain sockets for cluster setup + + + + .. rubric:: Footnotes .. [#mpi2rma] In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding RMA to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see http://www.mpi-forum.org/docs. diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index f5dc7320a4f37..8cf74a793b93a 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -101,7 +101,7 @@ General Parallel Computing Support .. function:: addprocs(n::Integer; exeflags=``) -> List of process identifiers Launches workers using the in-built ``LocalManager`` which only launches workers on the local host. - This can be used to take advantage of multiple cores. `addprocs(4)`` will add 4 processes on the local machine. + This can be used to take advantage of multiple cores. ``addprocs(4)`` will add 4 processes on the local machine. .. function:: addprocs() -> List of process identifiers diff --git a/examples/clustermanager/0mq/README b/examples/clustermanager/0mq/README new file mode 100644 index 0000000000000..c6bb2cfcfb7ff --- /dev/null +++ b/examples/clustermanager/0mq/README @@ -0,0 +1,27 @@ +This is a proof-of-concept that uses ZeroMQ as transport. +It uses a star topology as opposed to the native mesh network. + +Package ZMQ must be installed. All workers only run on localhost. + +All Julia nodes only connect to a "broker" process that listens on known ports +8100 and 8101 via ZMQ sockets. + + +All commands must be run from `examples/clustermanager/0mq` directory + +First, start the broker. In a new console type: + julia broker.jl + +This does not return. + +Next, start a Julia REPL and type: + include("ZMQCM.jl") + ZMQCM.start_master(4) # start with four workers + + +Alternatively, head.jl, a test script could be run. It just launches the requested number of workers, +executes a simple command on all of them and exits. + julia head.jl 4 + +NOTE: As stated this is a proof-of-concept. A real Julia cluster using ZMQ will probably use +different ZMQ socket types and optimize the transport. diff --git a/examples/clustermanager/0mq/ZMQCM.jl b/examples/clustermanager/0mq/ZMQCM.jl new file mode 100644 index 0000000000000..f7db7ca649d75 --- /dev/null +++ b/examples/clustermanager/0mq/ZMQCM.jl @@ -0,0 +1,276 @@ +using ZMQ + +import Base: launch, manage, connect, kill + +const BROKER_SUB_PORT = 8100 +const BROKER_PUB_PORT = 8101 + +const SELF_INITIATED = 0 +const REMOTE_INITIATED = 1 + +const PAYLOAD_MSG = "J" +const CONTROL_MSG = "Z" + +const REQUEST_ACK = "R" +const ACK_MSG = "A" +const KILL_MSG = "K" + +type ZMQCMan <: ClusterManager + map_zmq_julia::Dict{Int, Tuple} + c::Condition + isfree::Bool + ctx + pub + sub + zid_self + ZMQCMan() = new(Dict{Int, Tuple}(), Condition(), true) +end + +const manager = ZMQCMan() + +function lock_for_send() + if manager.isfree == true + manager.isfree = false + else + while manager.isfree == false + wait(manager.c) + if manager.isfree == true + manager.isfree = false + return + end + end + end +end + +function release_lock_for_send() + manager.isfree = true + notify(manager.c, all=true) +end + +function init_node(zid=0) + manager.ctx = Context(1) + pub=Socket(manager.ctx, PUB) # Outbound + connect(pub, "tcp://127.0.0.1:$BROKER_SUB_PORT") + + sub=Socket(manager.ctx, SUB) # In bound + connect(sub, "tcp://127.0.0.1:$BROKER_PUB_PORT") + ZMQ.set_subscribe(sub, string(zid)) + + manager.pub = pub + manager.sub = sub + manager.zid_self = zid + + (pub, sub) +end + +function send_data(zid, mtype, data) + lock_for_send() + ZMQ.send(manager.pub, Message(string(zid)), SNDMORE) + ZMQ.send(manager.pub, Message(string(manager.zid_self)), SNDMORE) + #println("Sending message of type $mtype to $zid") + ZMQ.send(manager.pub, Message(mtype), SNDMORE) + ZMQ.send(manager.pub, Message(data)) + release_lock_for_send() +end + +function setup_connection(zid, initiated_by) + try + read_stream=BufferStream() + write_stream=BufferStream() + + if initiated_by == REMOTE_INITIATED + test_remote = false + else + test_remote = true + end + + manager.map_zmq_julia[zid] = (read_stream, write_stream, test_remote) + + @schedule begin + while true + (r_s, w_s, do_test_remote) = manager.map_zmq_julia[zid] + if do_test_remote + send_data(zid, CONTROL_MSG, REQUEST_ACK) + sleep(0.5) + else + break + end + end + (r_s, w_s, do_test_remote) = manager.map_zmq_julia[zid] + + while true + data = readavailable(w_s) + send_data(zid, PAYLOAD_MSG, data) + end + end + (read_stream, write_stream) + catch e + Base.show_backtrace(STDOUT,catch_backtrace()) + println(e) + rethrow(e) + end +end + +# BROKER +function start_broker() + ctx=Context(1) + xpub=Socket(ctx, XPUB) + xsub=Socket(ctx, XSUB) + + ZMQ.bind(xsub, "tcp://127.0.0.1:$(BROKER_SUB_PORT)") + ZMQ.bind(xpub, "tcp://127.0.0.1:$(BROKER_PUB_PORT)") + + ccall((:zmq_proxy, :libzmq), Cint, (Ptr{Void}, Ptr{Void}, Ptr{Void}), xsub.data, xpub.data, C_NULL) +# proxy(xsub, xpub) + + # control never comes here + ZMQ.close(xpub) + ZMQ.close(xsub) + ZMQ.close(ctx) +end + +function recv_data() + try + #println("On $(manager.zid_self) waiting to recv message") + zid = int(bytestring(ZMQ.recv(manager.sub))) + assert(zid == manager.zid_self) + + from_zid = int(bytestring(ZMQ.recv(manager.sub))) + mtype = bytestring(ZMQ.recv(manager.sub)) + + #println("$zid received message of type $mtype from $from_zid") + + data = ZMQ.recv(manager.sub) + if mtype == CONTROL_MSG + cmsg = bytestring(data) + if cmsg == REQUEST_ACK + #println("$from_zid REQUESTED_ACK from $zid") + # send back a control_msg + send_data(from_zid, CONTROL_MSG, ACK_MSG) + elseif cmsg == ACK_MSG + #println("$zid got ACK_MSG from $from_zid") + (r_s, w_s, test_remote) = manager.map_zmq_julia[from_zid] + manager.map_zmq_julia[from_zid] = (r_s, w_s, false) + elseif cmsg == KILL_MSG + exit(0) + else + error("Unknown control message : ", cmsg) + end + data = "" + end + + (from_zid, data) + catch e + Base.show_backtrace(STDOUT,catch_backtrace()) + println(e) + rethrow(e) + end + +end + +# MASTER +function start_master(np) + init_node() + @schedule begin + try + while true + (from_zid, data) = recv_data() + + #println("master recv data from $from_zid") + + (r_s, w_s, t_r) = manager.map_zmq_julia[from_zid] + write(r_s, convert(Ptr{Uint8}, data), length(data)) + end + catch e + Base.show_backtrace(STDOUT,catch_backtrace()) + println(e) + rethrow(e) + end + end + + addprocs(manager; np=np) +end + + +function launch(manager::ZMQCMan, params::Dict, launched::Array, c::Condition) + #println("launch $(params[:np])") + for i in 1:params[:np] + io, pobj = open (`julia --worker custom worker.jl $i`, "r") + + wconfig = WorkerConfig() + wconfig.userdata = Dict(:zid=>i, :io=>io) + push!(launched, wconfig) + notify(c) + end +end + +function connect(manager::ZMQCMan, pid::Int, config::WorkerConfig) + #println("connect_m2w") + if myid() == 1 + zid = get(config.userdata)[:zid] + config.connect_at = zid # This will be useful in the worker-to-worker connection setup. + + print_worker_stdout(get(config.userdata)[:io], pid) + else + #println("connect_w2w") + zid = get(config.connect_at) + config.userdata = Dict{Symbol, Any}(:zid=>zid) + end + + streams = setup_connection(zid, SELF_INITIATED) + + udata = get(config.userdata) + udata[:streams] = streams + + streams +end + +# WORKER +function start_worker(zid) + #println("start_worker") + Base.init_worker(ZMQCMan()) + init_node(zid) + + while true + (from_zid, data) = recv_data() + + #println("worker recv data from $from_zid") + + streams = get(manager.map_zmq_julia, from_zid, nothing) + if streams == nothing + # First time.. + (r_s, w_s) = setup_connection(from_zid, REMOTE_INITIATED) + process_messages(r_s, w_s) + else + (r_s, w_s, t_r) = streams + end + + write(r_s, convert(Ptr{Uint8}, data), length(data)) + end +end + +function manage(manager::ZMQCMan, id::Int, config::WorkerConfig, op) + nothing +end + +function kill(manager::ZMQCMan, pid::Int, config::WorkerConfig) + send_data(get(config.userdata)[:zid], CONTROL_MSG, KILL_MSG) + (r_s, w_s) = get(config.userdata)[:streams] + close(r_s) + close(w_s) + + # remove from our map + delete!(manager.map_zmq_julia, get(config.userdata)[:zid]) + + nothing +end + + + +function print_worker_stdout(io, pid) + @schedule while !eof(io) + line = readline(io) + print("\tFrom worker $(pid):\t$line") + end +end + diff --git a/examples/clustermanager/0mq/broker.jl b/examples/clustermanager/0mq/broker.jl new file mode 100644 index 0000000000000..527090c606e66 --- /dev/null +++ b/examples/clustermanager/0mq/broker.jl @@ -0,0 +1,2 @@ +include("ZMQCM.jl") +start_broker() diff --git a/examples/clustermanager/0mq/head.jl b/examples/clustermanager/0mq/head.jl new file mode 100644 index 0000000000000..a3eb9c755cf8a --- /dev/null +++ b/examples/clustermanager/0mq/head.jl @@ -0,0 +1,9 @@ +include("ZMQCM.jl") + +# @spawn run(`julia broker.jl`) + +start_master(int(ARGS[1])) + +resp = pmap(x -> myid() *2, [1:nworkers()]) + +println(resp) diff --git a/examples/clustermanager/0mq/worker.jl b/examples/clustermanager/0mq/worker.jl new file mode 100644 index 0000000000000..abb7712751a83 --- /dev/null +++ b/examples/clustermanager/0mq/worker.jl @@ -0,0 +1,3 @@ +include("ZMQCM.jl") + +start_worker(int(ARGS[1])) diff --git a/examples/clustermanager/simple/README b/examples/clustermanager/simple/README new file mode 100644 index 0000000000000..86906ada08b7b --- /dev/null +++ b/examples/clustermanager/simple/README @@ -0,0 +1,12 @@ +This is a simple proof-of-concept that uses Unix Domain Sockets as transport. + +All commands must be run from `examples/clustermanager/simple` directory + +Start a Julia REPL and type: + include("UnixDomainCM.jl") + addprocs(UnixDomainCM(4)) # start with four workers + +Alternatively, head.jl, a test script could be run. It just launches the requested number of workers, +executes a simple command on all of them and exits. + julia head.jl 4 + diff --git a/examples/clustermanager/simple/UnixDomainCM.jl b/examples/clustermanager/simple/UnixDomainCM.jl new file mode 100644 index 0000000000000..114d70bdaf479 --- /dev/null +++ b/examples/clustermanager/simple/UnixDomainCM.jl @@ -0,0 +1,87 @@ +import Base: launch, manage, connect, exit + +type UnixDomainCM <: ClusterManager + np::Integer +end + +function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Condition) +# println("launch $(manager.np)") + for i in 1:manager.np + sockname = tempname() + try + cmd = `$(params[:exename]) --worker custom $(@__FILE__) worker $sockname` + io, pobj = open (cmd, "r") + + wconfig = WorkerConfig() + wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj) + push!(launched, wconfig) + notify(c) + catch e + println(e) + end + end +end + +function connect(manager::UnixDomainCM, pid::Int, config::WorkerConfig) + if myid() == 1 +# println("connect_m2w") + config.connect_at = get(config.userdata)[:sockname] # This will be useful in the worker-to-worker connection setup. + + print_worker_stdout(get(config.userdata)[:io], pid) + else +# println("connect_w2w") + sockname = get(config.connect_at) + config.userdata = Dict{Symbol, Any}(:sockname=>sockname) + end + + t = time() + while true + try + address = get(config.userdata)[:sockname] + if isa(address, Tuple) + sock = connect(address...) + else + sock = connect(ascii(address)) + end + return (sock, sock) + catch e + if (time() - t) > 10.0 + rethrow(e) + else + gc() + sleep(0.1) + end + end + end + +end + +# WORKER +function start_worker(sockname) + Base.init_worker(UnixDomainCM(0)) + + srvr = listen(ascii(sockname)) + while true + sock = accept(srvr) + process_messages(sock, sock) + end +end + +function manage(manager::UnixDomainCM, id::Int, config::WorkerConfig, op) + if op == :deregister + rm(get(config.userdata)[:sockname]) + end + nothing +end + +function print_worker_stdout(io, pid) + @schedule while !eof(io) + line = readline(io) + print("\tFrom worker $(pid):\t$line") + end +end + +if (length(ARGS) > 0) && (ARGS[1] == "worker") + # script has been launched as a worker + start_worker(ARGS[2]) +end diff --git a/examples/clustermanager/simple/head.jl b/examples/clustermanager/simple/head.jl new file mode 100644 index 0000000000000..6f1297e6d6050 --- /dev/null +++ b/examples/clustermanager/simple/head.jl @@ -0,0 +1,5 @@ +include("UnixDomainCM.jl") + +addprocs(UnixDomainCM(int(ARGS[1]))) +resp = pmap(x -> myid() *2, [1:nworkers()]) +println(resp) diff --git a/examples/clustermanager/simple/test_simple.jl b/examples/clustermanager/simple/test_simple.jl new file mode 100644 index 0000000000000..bfe159519c947 --- /dev/null +++ b/examples/clustermanager/simple/test_simple.jl @@ -0,0 +1,10 @@ +cmanpath = joinpath(dirname(@__FILE__), "UnixDomainCM.jl") +include(cmanpath) + +npids = addprocs(UnixDomainCM(2)) +assert(length(npids) == 2) +test_pids = [remotecall_fetch(x, myid) for x in npids] +assert(npids == test_pids) +rmprocs(npids; waitfor=1.0) + +exit(0) diff --git a/test/examples.jl b/test/examples.jl index cf0b9fe2b8723..bd12472027504 100644 --- a/test/examples.jl +++ b/test/examples.jl @@ -35,9 +35,36 @@ include(joinpath(dir, "queens.jl")) @test solve(8, 8, 1) == Array{Int,1}[[1,1]] @test solve(8, 8, 7) == Array{Int,1}[[1,1],[2,3],[3,5],[4,2],[5,8],[7,4],[8,7]] +# Different cluster managers do not play well together. Since +# the test infrastructure already uses LocalManager, we will test the simple +# cluster manager example through a new Julia session. +@unix_only begin + script = joinpath(dir, "clustermanager/simple/test_simple.jl") + cmd = `$(joinpath(JULIA_HOME,Base.julia_exename())) $script` + if !success(cmd) + error("UnixDomainCM failed test, cmd : $cmd") + end +end + # At least make sure code loads include(joinpath(dir, "plife.jl")) include(joinpath(dir, "preduce.jl")) include(joinpath(dir, "wordcount.jl")) + +# the 0mq clustermanager depends on package ZMQ. Just making sure the +# code loads using a stub module definition for ZMQ. +zmq_found=true +try + using ZMQ +catch + zmq_found=false +end + +if !zmq_found + eval(parse("module ZMQ end")) +end + +include(joinpath(dir, "clustermanager/0mq/ZMQCM.jl")) +