Skip to content

Commit

Permalink
Add SSHManager support for invoking Windows workers via cmd.exe (Juli…
Browse files Browse the repository at this point in the history
…aLang#38353)

Distributed.addprocs() now supports four new keyword arguments `shell`,
`ssh`, `env` and `cmdline_cookie`.

Specifying `shell=:wincmd` now makes it possible to start workers on a
Windows machine with an sshd server that invokes `cmd.exe` as the shell
(e.g. Microsoft's OpenSSH port does that by default). Previously
SSHManager only supported ssh connections to a POSIX shell.

Specifying `ssh="/usr/bin/ssh"` makes it possible to specify the ssh
client that SSHManager will use (useful for debugging and where a
custom-version of ssh is required).

The new `env` parameter now allows to pass arbitrary environment
variables to workers.

Specifying `cmdline_cookie=true` is a workaround for an ssh problem
with Windows workers that run older (pre-ConPTY) version of Windows,
Julia, or OpenSSH.
  • Loading branch information
mgkuhn committed Nov 23, 2020
1 parent 17ea0f8 commit bb6a48e
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 16 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ Standard library changes

#### Distributed

* Now supports invoking Windows workers via ssh (via new keyword argument `shell=:wincmd` in `addprocs`) ([#30614])

* Other new keyword arguments in `addprocs`: `ssh` to specify the ssh client path, `env` to pass environment variables to workers, and `cmdline_cookie` to work around an ssh problem with Windows workers that run older (pre-ConPTY) versions of Windows, Julia or OpenSSH. ([#30614])

#### UUIDs

Expand Down
4 changes: 2 additions & 2 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
using Base: Process, Semaphore, JLOptions, buffer_writes, @sync_add,
VERSION_STRING, binding_module, atexit, julia_exename,
julia_cmd, AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error, something, notnothing, isbuffered,
mapany
shell_escape_posixly, shell_escape_wincmd, escape_microsoft_c_args,
uv_error, something, notnothing, isbuffered, mapany
using Base.Threads: Event

using Serialization, Sockets
Expand Down
6 changes: 5 additions & 1 deletion stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ abstract type ClusterManager end
Type used by [`ClusterManager`](@ref)s to control workers added to their clusters. Some fields
are used by all cluster managers to access a host:
* `io` -- the connection used to access the worker (a subtype of `IO` or `Nothing`)
* `host` -- the host address (either an `AbstractString` or `Nothing`)
* `host` -- the host address (either a `String` or `Nothing`)
* `port` -- the port on the host used to connect to the worker (either an `Int` or `Nothing`)
Some are used by the cluster manager to add workers to an already-initialized host:
Expand Down Expand Up @@ -515,6 +515,10 @@ end

default_addprocs_params() = Dict{Symbol,Any}(
:topology => :all_to_all,
:ssh => "ssh",
:shell => :posix,
:cmdline_cookie => false,
:env => [],
:dir => pwd(),
:exename => joinpath(Sys.BINDIR::String, julia_exename()),
:exeflags => ``,
Expand Down
90 changes: 77 additions & 13 deletions stdlib/Distributed/src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,20 @@ Keyword arguments:
* `multiplex`: if `true` then SSH multiplexing is used for SSH tunneling. Default is `false`.
* `ssh`: the name or path of the SSH client executable used to start the workers.
Default is `"ssh"`.
* `sshflags`: specifies additional ssh options, e.g. ```sshflags=\`-i /home/foo/bar.pem\````
* `max_parallel`: specifies the maximum number of workers connected to in parallel at a
host. Defaults to 10.
* `shell`: specifies the type of shell to which ssh connects on the workers.
+ `shell=:posix`: a POSIX-compatible Unix/Linux shell (bash, sh, etc.). The default.
+ `shell=:wincmd`: Microsoft Windows `cmd.exe`.
* `dir`: specifies the working directory on the workers. Defaults to the host's current
directory (as found by `pwd()`)
Expand Down Expand Up @@ -105,8 +114,22 @@ Keyword arguments:
are setup lazily, i.e. they are setup at the first instance of a remote call between
workers. Default is true.
* `env`: provide an array of string pairs such as
`env=["JULIA_DEPOT_PATH"=>"/depot"] to request that environment variables
are set on the remote machine. By default only the environment variable
`JULIA_WORKER_TIMEOUT` is passed automatically from the local to the remote
environment.
* `cmdline_cookie`: pass the authentication cookie via the `--worker` commandline
option. The (more secure) default behaviour of passing the cookie via ssh stdio
may hang with Windows workers that use older (pre-ConPTY) Julia or Windows versions,
in which case `cmdline_cookie=true` offers a work-around.
!!! compat "Julia 1.6"
The keyword arguments `ssh`, `shell`, `env` and `cmdline_cookie`
were added in Julia 1.6.
Environment variables :
Environment variables:
If the master process fails to establish a connection with a newly launched worker within
60.0 seconds, the worker treats it as a fatal situation and terminates.
Expand Down Expand Up @@ -184,11 +207,15 @@ function parse_machine(machine::AbstractString)
end

function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, params::Dict, launched::Array, launch_ntfy::Condition)
shell = params[:shell]
ssh = params[:ssh]
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
tunnel = params[:tunnel]
multiplex = params[:multiplex]
cmdline_cookie = params[:cmdline_cookie]
env = Dict{String,String}(params[:env])

# machine could be of the format [user@]host[:port] bind_addr[:bind_port]
# machine format string is split on whitespace
Expand All @@ -199,7 +226,11 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa
if length(machine_bind) > 1
exeflags = `--bind-to $(machine_bind[2]) $exeflags`
end
exeflags = `$exeflags --worker`
if cmdline_cookie
exeflags = `$exeflags --worker=$(cluster_cookie())`
else
exeflags = `$exeflags --worker`
end

host, portnum = parse_machine(machine_bind[1])
portopt = portnum === nothing ? `` : `-p $portnum`
Expand All @@ -210,7 +241,7 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa
# If it's already running, later ssh sessions also use the same ssh multiplexing session even if
# `multiplex` is not explicitly specified; otherwise the tunneling session launched later won't
# go to background and hang. This is because of OpenSSH implementation.
if success(`ssh $sshflags -O check $host`)
if success(`$ssh $sshflags -O check $host`)
multiplex = true
elseif multiplex
# automatically create an SSH multiplexing session at the next SSH connection
Expand All @@ -221,33 +252,66 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa

# Build up the ssh command

# the default worker timeout
tval = get(ENV, "JULIA_WORKER_TIMEOUT", "")
# pass on some environment variables by default
for var in ["JULIA_WORKER_TIMEOUT"]
if !haskey(env, var) && haskey(ENV, var)
env[var] = ENV[var]
end
end
for var in keys(ENV)
occursin(r"^[a-zA-Z0-9_]+$", var) || throw(ArgumentError(var))
end

# Julia process with passed in command line flag arguments
cmds = """
cd -- $(shell_escape_posixly(dir))
$(isempty(tval) ? "" : "export JULIA_WORKER_TIMEOUT=$(shell_escape_posixly(tval))")
$(shell_escape_posixly(exename)) $(shell_escape_posixly(exeflags))"""
if shell == :posix
# ssh connects to a POSIX shell

# shell login (-l) with string command (-c) to launch julia process
cmd = `sh -l -c $cmds`
cmds = "$(shell_escape_posixly(exename)) $(shell_escape_posixly(exeflags))"
# set environment variables
for (var, val) in env
cmds = "export $(var)=$(shell_escape_posixly(val))\n$cmds"
end
# change working directory
cmds = "cd -- $(shell_escape_posixly(dir))\n$cmds"

# shell login (-l) with string command (-c) to launch julia process
remotecmd = shell_escape_posixly(`sh -l -c $cmds`)

elseif shell == :wincmd
# ssh connects to Windows cmd.exe

any(c -> c == '"', exename) && throw(ArgumentError("invalid exename"))

remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...))
# change working directory
if dir !== nothing && dir != ""
any(c -> c == '"', dir) && throw(ArgumentError("invalid dir"))
remotecmd = "pushd \"$(dir)\" && $remotecmd"
end
# set environment variables
for (var, val) in env
remotecmd = "set $(var)=$(shell_escape_wincmd(val))&& $remotecmd"
end

else
throw(ArgumentError("invalid shell"))
end

# remote launch with ssh with given ssh flags / host / port information
# -T → disable pseudo-terminal allocation
# -a → disable forwarding of auth agent connection
# -x → disable X11 forwarding
# -o ClearAllForwardings → option if forwarding connections and
# forwarded connections are causing collisions
cmd = `ssh -T -a -x -o ClearAllForwardings=yes $sshflags $host $(shell_escape_posixly(cmd))`
cmd = `$ssh -T -a -x -o ClearAllForwardings=yes $sshflags $host $remotecmd`

# launch the remote Julia process

# detach launches the command in a new process group, allowing it to outlive
# the initial julia process (Ctrl-C and teardown methods are handled through messages)
# for the launched processes.
io = open(detach(cmd), "r+")
write_cookie(io)
cmdline_cookie || write_cookie(io)

wconfig = WorkerConfig()
wconfig.io = io.out
Expand Down

0 comments on commit bb6a48e

Please sign in to comment.