Skip to content

Commit

Permalink
[FLINK-10934][e2e] Add e2e tests for Kubernetes application mode
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyang0918 authored and kl0u committed May 13, 2020
1 parent c84e48c commit 831ca0b
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 16 deletions.
1 change: 1 addition & 0 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ if [[ ${PROFILE} != *"jdk11"* ]]; then

run_test "Run Kubernetes test" "$END_TO_END_DIR/test-scripts/test_kubernetes_embedded_job.sh"
run_test "Run kubernetes session test" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh"
run_test "Run kubernetes application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application.sh"

run_test "Running Flink over NAT end-to-end test" "$END_TO_END_DIR/test-scripts/test_nat.sh" "skip_check_exceptions"

Expand Down
21 changes: 21 additions & 0 deletions flink-end-to-end-tests/test-scripts/common_kubernetes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,25 @@ function wait_rest_endpoint_up_k8s {
exit 1
}

function cleanup {
if [ $TRAPPED_EXIT_CODE != 0 ];then
debug_and_show_logs
fi
internal_cleanup
kubectl wait --for=delete pod --all=true
stop_kubernetes
}

function setConsoleLogging {
cat >> $FLINK_DIR/conf/log4j.properties <<END
rootLogger.appenderRef.console.ref = ConsoleAppender
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%t] %-60c %x - %m%n
END
}

on_exit cleanup
62 changes: 62 additions & 0 deletions flink-end-to-end-tests/test-scripts/test_kubernetes_application.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

source "$(dirname "$0")"/common_kubernetes.sh

CLUSTER_ROLE_BINDING="flink-role-binding-default"
CLUSTER_ID="flink-native-k8s-application-1"
FLINK_IMAGE_NAME="test_kubernetes_application"
LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"

function internal_cleanup {
kubectl delete deployment ${CLUSTER_ID}
kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
}

setConsoleLogging

start_kubernetes

cd "$DOCKER_MODULE_DIR"
build_image_with_jar ${FLINK_DIR}/examples/batch/WordCount.jar ${FLINK_IMAGE_NAME}

kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit --serviceaccount=default:default --namespace=default

mkdir -p "$LOCAL_LOGS_PATH"

# Set the memory and cpu smaller than default, so that the jobmanager and taskmanager pods could be allocated in minikube.
"$FLINK_DIR"/bin/flink run-application -t kubernetes-application \
-Dkubernetes.cluster-id=${CLUSTER_ID} \
-Dkubernetes.container.image=${FLINK_IMAGE_NAME} \
-Djobmanager.memory.process.size=1088m \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dkubernetes.taskmanager.cpu=0.5 \
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" \
-Dkubernetes.rest-service.exposed.type=NodePort \
local:https:///opt/flink/usrlib/WordCount.jar

kubectl wait --for=condition=Available --timeout=30s deploy/${CLUSTER_ID} || exit 1
jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
wait_rest_endpoint_up_k8s $jm_pod_name

# The Flink cluster will be destroyed immediately once the job finished or failed. So we check jobmanager logs
# instead of checking the result
kubectl logs -f $jm_pod_name >$LOCAL_LOGS_PATH/jobmanager.log
grep -E "Job [A-Za-z0-9]+ reached globally terminal state FINISHED" $LOCAL_LOGS_PATH/jobmanager.log

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,10 @@ export OUTPUT_FILE=kubernetes_wc_out
export FLINK_JOB_PARALLELISM=1
export FLINK_JOB_ARGUMENTS='"--output", "/cache/kubernetes_wc_out"'

SUCCEEDED=1

function cleanup {
if [ $SUCCEEDED != 0 ];then
debug_and_show_logs
fi
function internal_cleanup {
kubectl delete job flink-job-cluster
kubectl delete service flink-job-cluster
kubectl delete deployment flink-task-manager
stop_kubernetes
}

start_kubernetes
Expand All @@ -54,4 +48,3 @@ kubectl wait --for=condition=complete job/flink-job-cluster --timeout=1h
kubectl cp `kubectl get pods | awk '/task-manager/ {print $1}'`:/cache/${OUTPUT_FILE} ${OUTPUT_VOLUME}/${OUTPUT_FILE}

check_result_hash "WordCount" ${OUTPUT_VOLUME}/${OUTPUT_FILE} "${RESULT_HASH}"
SUCCEEDED=$?
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,9 @@ LOCAL_OUTPUT_PATH="${TEST_DATA_DIR}/out/wc_out"
OUTPUT_PATH="/tmp/wc_out"
ARGS="--output ${OUTPUT_PATH}"

SUCCEEDED=1

function cleanup {
if [ $SUCCEEDED != 0 ];then
debug_and_show_logs
fi
function internal_cleanup {
kubectl delete deployment ${CLUSTER_ID}
kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
stop_kubernetes
}

function setConsoleLogging {
Expand Down Expand Up @@ -81,4 +75,3 @@ wait_rest_endpoint_up_k8s $jm_pod_name
kubectl cp `kubectl get pods | awk '/taskmanager/ {print $1}'`:${OUTPUT_PATH} ${LOCAL_OUTPUT_PATH}

check_result_hash "WordCount" "${LOCAL_OUTPUT_PATH}" "${RESULT_HASH}"
SUCCEEDED=$?

0 comments on commit 831ca0b

Please sign in to comment.