From 791b3ff826896693d8498c0358ca2f1a4f03f030 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Thu, 30 Apr 2020 18:44:43 -0400 Subject: [PATCH] [BEAM-9860] Require job_endpoint when using PortableRunner. --- CHANGES.md | 2 +- .../runners/portability/job_server.py | 78 ------------------- .../runners/portability/portable_runner.py | 11 +-- 3 files changed, 5 insertions(+), 86 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e03466d30108c..acc020110a491 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,7 +65,7 @@ ## Breaking Changes -* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* The Python SDK now requires `--job_endpoint` to be set when using `--runner=PortableRunner` ([BEAM-9860](https://issues.apache.org/jira/browse/BEAM-9860)). Users seeking the old default behavior should set `--runner=FlinkRunner` instead. ## Deprecations diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index 66c43c675b4a2..ec40ae843b648 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -22,8 +22,6 @@ import atexit import shutil import signal -import subprocess -import sys import tempfile import threading @@ -158,79 +156,3 @@ def subprocess_cmd_and_endpoint(self): job_port, self._artifact_port, self._expansion_port, artifacts_dir)), 'localhost:%s' % job_port) - - -class DockerizedJobServer(SubprocessJobServer): - """ - Spins up the JobServer in a docker container for local execution. - """ - def __init__( - self, - job_host="localhost", - job_port=None, - artifact_port=None, - expansion_port=None, - harness_port_range=(8100, 8200), - max_connection_retries=5): - super(DockerizedJobServer, self).__init__() - self.job_host = job_host - self.job_port = job_port - self.expansion_port = expansion_port - self.artifact_port = artifact_port - self.harness_port_range = harness_port_range - self.max_connection_retries = max_connection_retries - - def subprocess_cmd_and_endpoint(self): - # TODO This is hardcoded to Flink at the moment but should be changed - job_server_image_name = "apache/beam_flink%s_job_server:latest" % ( - pipeline_options.FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]) - docker_path = subprocess.check_output(['which', - 'docker']).strip().decode('utf-8') - cmd = [ - "docker", - "run", - # We mount the docker binary and socket to be able to spin up - # "sibling" containers for the SDK harness. - "-v", - ':'.join([docker_path, "/bin/docker"]), - "-v", - "/var/run/docker.sock:/var/run/docker.sock" - ] - - self.job_port, self.artifact_port, self.expansion_port = ( - subprocess_server.pick_port( - self.job_port, self.artifact_port, self.expansion_port)) - - args = [ - '--job-host', - self.job_host, - '--job-port', - str(self.job_port), - '--artifact-port', - str(self.artifact_port), - '--expansion-port', - str(self.expansion_port) - ] - - if sys.platform == "darwin": - # Docker-for-Mac doesn't support host networking, so we need to explictly - # publish ports from the Docker container to be able to connect to it. - # Also, all other containers need to be aware that they run Docker-on-Mac - # to connect against the internal Docker-for-Mac address. - cmd += ["-e", "DOCKER_MAC_CONTAINER=1"] - cmd += ["-p", "{}:{}".format(self.job_port, self.job_port)] - cmd += ["-p", "{}:{}".format(self.artifact_port, self.artifact_port)] - cmd += ["-p", "{}:{}".format(self.expansion_port, self.expansion_port)] - cmd += [ - "-p", - "{0}-{1}:{0}-{1}".format( - self.harness_port_range[0], self.harness_port_range[1]) - ] - else: - # This shouldn't be set for MacOS because it detroys port forwardings, - # even though host networking is not supported on MacOS. - cmd.append("--network=host") - - cmd.append(job_server_image_name) - - return cmd + args, '%s:%s' % (self.job_host, self.job_port) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 10f81e6458f00..b384826b7593d 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -303,13 +303,10 @@ def _create_environment(options): return env_class.from_options(portable_options) def default_job_server(self, options): - # type: (PipelineOptions) -> job_server.JobServer - # TODO Provide a way to specify a container Docker URL - # https://issues.apache.org/jira/browse/BEAM-6328 - if not self._dockerized_job_server: - self._dockerized_job_server = job_server.StopOnExitJobServer( - job_server.DockerizedJobServer()) - return self._dockerized_job_server + raise NotImplementedError( + 'You must specify a --job_endpoint when using --runner=PortableRunner. ' + 'Alternatively, you may specify which portable runner you intend to ' + 'use, such as --runner=FlinkRunner or --runner=SparkRunner.') def create_job_service_handle(self, job_service, options): return JobServiceHandle(job_service, options)