Skip to content

Commit

Permalink
[FLINK-17596][python] Move the Python UDF runner script out of the ja…
Browse files Browse the repository at this point in the history
…r of flink-python

This closes apache#12092.
  • Loading branch information
WeiZhong94 authored and Dian Fu committed May 15, 2020
1 parent 91557c8 commit 282da0d
Show file tree
Hide file tree
Showing 27 changed files with 301 additions and 586 deletions.
3 changes: 3 additions & 0 deletions flink-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ under the License.
<directory>../flink-python/bin/</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
<excludes>
<exclude>pyflink-udf-runner.*</exclude>
</excludes>
</fileSet>

<!-- copy SQL client -->
Expand Down
File renamed without changes.
File renamed without changes.
134 changes: 1 addition & 133 deletions flink-python/pyflink/fn_execution/boot.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
harness of Apache Beam.
"""
import argparse
import hashlib
import os
from subprocess import call

Expand All @@ -38,17 +37,10 @@

from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub
from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest
from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ArtifactRetrievalServiceStub
from apache_beam.portability.api.beam_artifact_api_pb2 import (GetManifestRequest,
GetArtifactRequest)
from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor

from distutils.dist import Distribution

from google.protobuf import json_format, text_format

from pkg_resources import get_distribution, parse_version


def check_not_empty(check_str, error_message):
if check_str == "":
Expand All @@ -58,81 +50,6 @@ def check_not_empty(check_str, error_message):

python_exec = sys.executable

PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE"
PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE"
PYTHON_REQUIREMENTS_INSTALL_DIR = "_PYTHON_REQUIREMENTS_INSTALL_DIR"


def append_path_to_env(env, name, value):
if name in env:
env[name] = os.pathsep.join([value, env[name]])
else:
env[name] = value


def get_site_packages_paths(prefix):
install_obj = Distribution().get_command_obj('install', create=True)
install_obj.prefix = prefix
install_obj.finalize_options()
installed_dir = [install_obj.install_purelib]
if install_obj.install_purelib != install_obj.install_platlib:
installed_dir.append(install_obj.install_platlib)
return installed_dir


def get_prefix_option(requirements_install_path):
pip_version = get_distribution("pip").version
# since '--prefix' option is only supported for pip 8.0+, so here we fallback to
# use '--install-option' when the pip version is lower than 8.0.0.
if parse_version(pip_version) >= parse_version('8.0.0'):
return ["--prefix", requirements_install_path]
else:
return ['--install-option', '--prefix=' + requirements_install_path]


def pip_install_requirements():
if (PYTHON_REQUIREMENTS_FILE in os.environ
and PYTHON_REQUIREMENTS_INSTALL_DIR in os.environ):
requirements_file_path = os.environ[PYTHON_REQUIREMENTS_FILE]
requirements_install_path = os.environ[PYTHON_REQUIREMENTS_INSTALL_DIR]
if PYTHON_REQUIREMENTS_CACHE in os.environ:
requirements_cache_path = os.environ[PYTHON_REQUIREMENTS_CACHE]
else:
requirements_cache_path = None

env = dict(os.environ)
installed_python_path = os.pathsep.join(get_site_packages_paths(requirements_install_path))
installed_python_script_path = os.path.join(requirements_install_path, "bin")
append_path_to_env(env, "PYTHONPATH", installed_python_path)
append_path_to_env(env, "PATH", installed_python_script_path)

pip_install_commands = [python_exec, "-m", "pip", "install", "--ignore-installed", "-r",
requirements_file_path]
pip_install_commands.extend(get_prefix_option(requirements_install_path))
if requirements_cache_path is not None:
pip_install_commands.extend(["--find-links", requirements_cache_path])

max_retry_times = 3
cur_retry = 0
while cur_retry < max_retry_times:
cur_retry += 1
logging.info("Run command: %s with retry (%d/%d)\n" % (" ".join(pip_install_commands),
cur_retry, max_retry_times))
exit_code = call(
pip_install_commands, stdout=sys.stdout, stderr=sys.stderr, env=env)
if exit_code != 0:
if cur_retry < max_retry_times:
logging.error("Run command: %s error! exit code: %d. Retry to run again!" %
(" ".join(pip_install_commands), exit_code))
else:
raise Exception(
"Run command: %s error! exit code: %d. Max retry times exhausted!" %
(" ".join(pip_install_commands), exit_code))
else:
break
os.environ["PYTHONPATH"] = env["PYTHONPATH"]
os.environ["PATH"] = env["PATH"]


if __name__ == "__main__":
# print INFO and higher level messages
Expand All @@ -143,27 +60,23 @@ def pip_install_requirements():
parser.add_argument("--id", default="", help="Local identifier (required).")
parser.add_argument("--logging_endpoint", default="",
help="Logging endpoint (required).")
parser.add_argument("--artifact_endpoint", default="",
help="Artifact endpoint (required).")
parser.add_argument("--provision_endpoint", default="",
help="Provision endpoint (required).")
parser.add_argument("--control_endpoint", default="",
help="Control endpoint (required).")
parser.add_argument("--semi_persist_dir", default="/tmp",
help="Local semi-persistent directory (optional).")

args = parser.parse_args()
args = parser.parse_known_args()[0]

worker_id = args.id
logging_endpoint = args.logging_endpoint
artifact_endpoint = args.artifact_endpoint
provision_endpoint = args.provision_endpoint
control_endpoint = args.control_endpoint
semi_persist_dir = args.semi_persist_dir

check_not_empty(worker_id, "No id provided.")
check_not_empty(logging_endpoint, "No logging endpoint provided.")
check_not_empty(artifact_endpoint, "No artifact endpoint provided.")
check_not_empty(provision_endpoint, "No provision endpoint provided.")
check_not_empty(control_endpoint, "No control endpoint provided.")

Expand All @@ -177,51 +90,6 @@ def pip_install_requirements():
info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info
options = json_format.MessageToJson(info.pipeline_options)

staged_dir = os.path.join(semi_persist_dir, "staged")

# download files
with grpc.insecure_channel(artifact_endpoint) as channel:
client = ArtifactRetrievalServiceStub(channel=channel)
# get file list via retrieval token
response = client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token),
metadata=metadata)
artifacts = response.manifest.artifact
# download files and check hash values
for artifact in artifacts:
name = artifact.name
permissions = artifact.permissions
sha256 = artifact.sha256
file_path = os.path.join(staged_dir, name)
if os.path.exists(file_path):
with open(file_path, "rb") as f:
sha256obj = hashlib.sha256()
sha256obj.update(f.read())
hash_value = sha256obj.hexdigest()
if hash_value == sha256:
logging.info("The file: %s already exists and its sha256 hash value: %s is the "
"same as the expected hash value, skipped." % (file_path, sha256))
continue
else:
os.remove(file_path)
if not os.path.exists(os.path.dirname(file_path)):
os.makedirs(os.path.dirname(file_path), 0o755)
stream = client.GetArtifact(
GetArtifactRequest(name=name, retrieval_token=info.retrieval_token),
metadata=metadata)
with open(file_path, "wb") as f:
sha256obj = hashlib.sha256()
for artifact_chunk in stream:
sha256obj.update(artifact_chunk.data)
f.write(artifact_chunk.data)
hash_value = sha256obj.hexdigest()
if hash_value != sha256:
raise Exception("The sha256 hash value: %s of the downloaded file: %s is not the"
" same as the expected hash value: %s" %
(hash_value, file_path, sha256))
os.chmod(file_path, int(str(permissions), 8))

pip_install_requirements()

os.environ["WORKER_ID"] = worker_id
os.environ["PIPELINE_OPTIONS"] = options
os.environ["SEMI_PERSISTENT_DIRECTORY"] = semi_persist_dir
Expand Down
114 changes: 0 additions & 114 deletions flink-python/pyflink/fn_execution/tests/process_mode_test_data.py

This file was deleted.

Loading

0 comments on commit 282da0d

Please sign in to comment.