Skip to content

Commit

Permalink
[FLINK-19974][python][e2e] Extract PyFlink YARN per-job test into a s…
Browse files Browse the repository at this point in the history
…eparate script

This closes apache#13930.
  • Loading branch information
dianfu committed Nov 9, 2020
1 parent 4f9881a commit 7b4c97b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 56 deletions.
4 changes: 4 additions & 0 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ run_test "Shaded Hadoop S3A with credentials provider end-to-end test" "$END_TO_
if [[ `uname -i` != 'aarch64' ]]; then
run_test "PyFlink end-to-end test" "$END_TO_END_DIR/test-scripts/test_pyflink.sh" "skip_check_exceptions"
fi
# These tests are known to fail on JDK11. See FLINK-13719
if [[ ${PROFILE} != *"jdk11"* ]] && [[ `uname -i` != 'aarch64' ]]; then
run_test "PyFlink YARN per-job on Docker test" "$END_TO_END_DIR/test-scripts/test_pyflink_yarn.sh" "skip_check_exceptions"
fi

################################################################################
# Sticky Scheduling
Expand Down
62 changes: 6 additions & 56 deletions flink-end-to-end-tests/test-scripts/test_pyflink.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ function sort_msg {
echo "${sorted[*]}"
}

function test_clean_up {
stop_cluster
stop_kafka_cluster
}

CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
source "${CURRENT_DIR}"/common.sh
source "${CURRENT_DIR}"/kafka_sql_common.sh \
Expand All @@ -63,6 +58,12 @@ source "${CURRENT_DIR}"/kafka_sql_common.sh \
${CONFLUENT_MAJOR_VERSION} \
${KAFKA_SQL_VERSION}

function test_clean_up {
stop_cluster
stop_kafka_cluster
}
on_exit test_clean_up

cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"

echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
Expand Down Expand Up @@ -99,7 +100,6 @@ deactivate
cd "${CURRENT_DIR}"

start_cluster
on_exit test_clean_up

echo "Test PyFlink Table job:"

Expand Down Expand Up @@ -285,53 +285,3 @@ if [[ "${EXPECTED_MSG[*]}" != "${SORTED_READ_MSG[*]}" ]]; then
echo -e "ACTUAL: --${SORTED_READ_MSG[*]}--"
exit 1
fi

stop_cluster

# These tests are known to fail on JDK11. See FLINK-13719
if [[ ${PROFILE} != *"jdk11"* ]]; then
cd "${CURRENT_DIR}/../"
source "${CURRENT_DIR}"/common_yarn_docker.sh
# test submitting on yarn
start_hadoop_cluster_and_prepare_flink

# copy test files
docker cp "${FLINK_PYTHON_DIR}/dev/lint-python.sh" master:/tmp/
docker cp "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar" master:/tmp/
docker cp "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" master:/tmp/
docker cp "${REQUIREMENTS_PATH}" master:/tmp/
docker cp "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" master:/tmp/
PYFLINK_PACKAGE_FILE=$(basename "${FLINK_PYTHON_DIR}"/dist/apache-flink-*.tar.gz)
docker cp "${FLINK_PYTHON_DIR}/dist/${PYFLINK_PACKAGE_FILE}" master:/tmp/

# prepare environment
docker exec master bash -c "
/tmp/lint-python.sh -s miniconda
source /tmp/.conda/bin/activate
pip install /tmp/${PYFLINK_PACKAGE_FILE}
conda install -y -q zip=3.0
rm -rf /tmp/.conda/pkgs
cd /tmp
zip -q -r /tmp/venv.zip .conda
echo \"taskmanager.memory.task.off-heap.size: 100m\" >> \"/home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml\"
"

docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \
/home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 -yjm 1000 \
-pyfs /tmp/add_one.py \
-pyreq /tmp/requirements.txt \
-pyarch /tmp/venv.zip \
-pyexec venv.zip/.conda/bin/python \
/tmp/PythonUdfSqlJobExample.jar"

docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \
/home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 -yjm 1000 \
-pyfs /tmp/add_one.py \
-pyreq /tmp/requirements.txt \
-pyarch /tmp/venv.zip \
-pyexec venv.zip/.conda/bin/python \
-py /tmp/python_job.py \
pipeline.jars file:/tmp/PythonUdfSqlJobExample.jar"
fi
72 changes: 72 additions & 0 deletions flink-end-to-end-tests/test-scripts/test_pyflink_yarn.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

set -Eeuo pipefail

CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P`
FLINK_PYTHON_TEST_DIR=`cd "${CURRENT_DIR}/../flink-python-test" && pwd -P`
REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt"

echo "pytest==4.4.1" > "${REQUIREMENTS_PATH}"

# These tests are known to fail on JDK11. See FLINK-13719
cd "${CURRENT_DIR}/../"
source "${CURRENT_DIR}"/common_yarn_docker.sh
# test submitting on yarn
start_hadoop_cluster_and_prepare_flink

# copy test files
docker cp "${FLINK_PYTHON_DIR}/dev/lint-python.sh" master:/tmp/
docker cp "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar" master:/tmp/
docker cp "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" master:/tmp/
docker cp "${REQUIREMENTS_PATH}" master:/tmp/
docker cp "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" master:/tmp/
PYFLINK_PACKAGE_FILE=$(basename "${FLINK_PYTHON_DIR}"/dist/apache-flink-*.tar.gz)
docker cp "${FLINK_PYTHON_DIR}/dist/${PYFLINK_PACKAGE_FILE}" master:/tmp/

# prepare environment
docker exec master bash -c "
/tmp/lint-python.sh -s miniconda
source /tmp/.conda/bin/activate
pip install /tmp/${PYFLINK_PACKAGE_FILE}
conda install -y -q zip=3.0
rm -rf /tmp/.conda/pkgs
cd /tmp
zip -q -r /tmp/venv.zip .conda
"

docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \
/home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 -yjm 1000 \
-pyfs /tmp/add_one.py \
-pyreq /tmp/requirements.txt \
-pyarch /tmp/venv.zip \
-pyexec venv.zip/.conda/bin/python \
/tmp/PythonUdfSqlJobExample.jar"

docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
export PYFLINK_CLIENT_EXECUTABLE=/tmp/.conda/bin/python && \
/home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -ytm 1500 -yjm 1000 \
-pyfs /tmp/add_one.py \
-pyreq /tmp/requirements.txt \
-pyarch /tmp/venv.zip \
-pyexec venv.zip/.conda/bin/python \
-py /tmp/python_job.py \
pipeline.jars file:/tmp/PythonUdfSqlJobExample.jar"

0 comments on commit 7b4c97b

Please sign in to comment.