Skip to content

Commit

Permalink
[BEAM-9860] Require job_endpoint when using PortableRunner.
Browse files Browse the repository at this point in the history
  • Loading branch information
ibzib committed Apr 30, 2020
1 parent de53fc3 commit 791b3ff
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 86 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@

## Breaking Changes

* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* The Python SDK now requires `--job_endpoint` to be set when using `--runner=PortableRunner` ([BEAM-9860](https://issues.apache.org/jira/browse/BEAM-9860)). Users seeking the old default behavior should set `--runner=FlinkRunner` instead.

## Deprecations

Expand Down
78 changes: 0 additions & 78 deletions sdks/python/apache_beam/runners/portability/job_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import atexit
import shutil
import signal
import subprocess
import sys
import tempfile
import threading

Expand Down Expand Up @@ -158,79 +156,3 @@ def subprocess_cmd_and_endpoint(self):
job_port, self._artifact_port, self._expansion_port,
artifacts_dir)),
'localhost:%s' % job_port)


class DockerizedJobServer(SubprocessJobServer):
"""
Spins up the JobServer in a docker container for local execution.
"""
def __init__(
self,
job_host="localhost",
job_port=None,
artifact_port=None,
expansion_port=None,
harness_port_range=(8100, 8200),
max_connection_retries=5):
super(DockerizedJobServer, self).__init__()
self.job_host = job_host
self.job_port = job_port
self.expansion_port = expansion_port
self.artifact_port = artifact_port
self.harness_port_range = harness_port_range
self.max_connection_retries = max_connection_retries

def subprocess_cmd_and_endpoint(self):
# TODO This is hardcoded to Flink at the moment but should be changed
job_server_image_name = "apache/beam_flink%s_job_server:latest" % (
pipeline_options.FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1])
docker_path = subprocess.check_output(['which',
'docker']).strip().decode('utf-8')
cmd = [
"docker",
"run",
# We mount the docker binary and socket to be able to spin up
# "sibling" containers for the SDK harness.
"-v",
':'.join([docker_path, "/bin/docker"]),
"-v",
"/var/run/docker.sock:/var/run/docker.sock"
]

self.job_port, self.artifact_port, self.expansion_port = (
subprocess_server.pick_port(
self.job_port, self.artifact_port, self.expansion_port))

args = [
'--job-host',
self.job_host,
'--job-port',
str(self.job_port),
'--artifact-port',
str(self.artifact_port),
'--expansion-port',
str(self.expansion_port)
]

if sys.platform == "darwin":
# Docker-for-Mac doesn't support host networking, so we need to explictly
# publish ports from the Docker container to be able to connect to it.
# Also, all other containers need to be aware that they run Docker-on-Mac
# to connect against the internal Docker-for-Mac address.
cmd += ["-e", "DOCKER_MAC_CONTAINER=1"]
cmd += ["-p", "{}:{}".format(self.job_port, self.job_port)]
cmd += ["-p", "{}:{}".format(self.artifact_port, self.artifact_port)]
cmd += ["-p", "{}:{}".format(self.expansion_port, self.expansion_port)]
cmd += [
"-p",
"{0}-{1}:{0}-{1}".format(
self.harness_port_range[0], self.harness_port_range[1])
]
else:
# This shouldn't be set for MacOS because it detroys port forwardings,
# even though host networking is not supported on MacOS.
cmd.append("--network=host")

cmd.append(job_server_image_name)

return cmd + args, '%s:%s' % (self.job_host, self.job_port)
11 changes: 4 additions & 7 deletions sdks/python/apache_beam/runners/portability/portable_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,10 @@ def _create_environment(options):
return env_class.from_options(portable_options)

def default_job_server(self, options):
# type: (PipelineOptions) -> job_server.JobServer
# TODO Provide a way to specify a container Docker URL
# https://issues.apache.org/jira/browse/BEAM-6328
if not self._dockerized_job_server:
self._dockerized_job_server = job_server.StopOnExitJobServer(
job_server.DockerizedJobServer())
return self._dockerized_job_server
raise NotImplementedError(
'You must specify a --job_endpoint when using --runner=PortableRunner. '
'Alternatively, you may specify which portable runner you intend to '
'use, such as --runner=FlinkRunner or --runner=SparkRunner.')

def create_job_service_handle(self, job_service, options):
return JobServiceHandle(job_service, options)
Expand Down

0 comments on commit 791b3ff

Please sign in to comment.