Skip to content

Commit

Permalink
[FLINK-10842] [e2e] Fix broken waiting loops in common.sh
Browse files Browse the repository at this point in the history
This closes apache#7073.
  • Loading branch information
azagrebin authored and twalthr committed Nov 23, 2018
1 parent 33f3de0 commit 76afc31
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
55 changes: 38 additions & 17 deletions flink-end-to-end-tests/test-scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -212,19 +212,22 @@ function start_local_zk {
function wait_dispatcher_running {
# wait at most 10 seconds until the dispatcher is up
local QUERY_URL="${REST_PROTOCOL}:https://${NODENAME}:8081/taskmanagers"
for i in {1..10}; do
local TIMEOUT=10
for i in $(seq 1 ${TIMEOUT}); do
# without the || true this would exit our script if the JobManager is not yet up
QUERY_RESULT=$(curl ${CURL_SSL_ARGS} "$QUERY_URL" 2> /dev/null || true)

# ensure the taskmanagers field is there at all and is not empty
if [[ ${QUERY_RESULT} =~ \{\"taskmanagers\":\[.+\]\} ]]; then
echo "Dispatcher REST endpoint is up."
break
return
fi

echo "Waiting for dispatcher REST endpoint to come up..."
sleep 1
done
echo "Dispatcher REST endpoint has not started within a timeout of ${TIMEOUT} sec"
exit 1
}

function start_cluster {
Expand All @@ -242,30 +245,45 @@ function start_taskmanagers {
}

function start_and_wait_for_tm {
local url="${REST_PROTOCOL}:https://${NODENAME}:8081/taskmanagers"

tm_query_result=$(curl ${CURL_SSL_ARGS} -s "${url}")

tm_query_result=`query_running_tms`
# we assume that the cluster is running
if ! [[ ${tm_query_result} =~ \{\"taskmanagers\":\[.*\]\} ]]; then
echo "Your cluster seems to be unresponsive at the moment: ${tm_query_result}" 1>&2
exit 1
fi

running_tms=`curl ${CURL_SSL_ARGS} -s "${url}" | grep -o "id" | wc -l`

running_tms=`query_number_of_running_tms`
${FLINK_DIR}/bin/taskmanager.sh start
wait_for_number_of_running_tms $((running_tms+1))
}

for i in {1..10}; do
local new_running_tms=`curl ${CURL_SSL_ARGS} -s "${url}" | grep -o "id" | wc -l`
if [ $((new_running_tms-running_tms)) -eq 0 ]; then
echo "TaskManager is not yet up."
function query_running_tms {
local url="${REST_PROTOCOL}:https://${NODENAME}:8081/taskmanagers"
curl ${CURL_SSL_ARGS} -s "${url}"
}

function query_number_of_running_tms {
query_running_tms | grep -o "id" | wc -l
}

function wait_for_number_of_running_tms {
local TM_NUM_TO_WAIT=${1}
local TIMEOUT_COUNTER=10
local TIMEOUT_INC=4
local TIMEOUT=$(( $TIMEOUT_COUNTER * $TIMEOUT_INC ))
local TM_NUM_TEXT="Number of running task managers"
for i in $(seq 1 ${TIMEOUT_COUNTER}); do
local TM_NUM=`query_number_of_running_tms`
if [ $((TM_NUM - TM_NUM_TO_WAIT)) -eq 0 ]; then
echo "${TM_NUM_TEXT} has reached ${TM_NUM_TO_WAIT}."
return
else
echo "TaskManager is up."
break
echo "${TM_NUM_TEXT} ${TM_NUM} is not yet ${TM_NUM_TO_WAIT}."
fi
sleep 4
sleep ${TIMEOUT_INC}
done
echo "${TM_NUM_TEXT} has not reached ${TM_NUM_TO_WAIT} within a timeout of ${TIMEOUT} sec"
exit 1
}

function check_logs_for_errors {
Expand Down Expand Up @@ -376,17 +394,20 @@ function wait_for_job_state_transition {
}

function wait_job_running {
for i in {1..10}; do
local TIMEOUT=10
for i in $(seq 1 ${TIMEOUT}); do
JOB_LIST_RESULT=$("$FLINK_DIR"/bin/flink list -r | grep "$1")

if [[ "$JOB_LIST_RESULT" == "" ]]; then
echo "Job ($1) is not yet running."
else
echo "Job ($1) is running."
break
return
fi
sleep 1
done
echo "Job ($1) has not started within a timeout of ${TIMEOUT} sec"
exit 1
}

function wait_job_terminal_state {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ function run_test() {
fi

kill_random_taskmanager
wait_for_number_of_running_tms 0

latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}')
echo "Latest snapshot count was ${latest_snapshot_count}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,10 @@ fi

DATASTREAM_JOB=$($JOB_CMD | grep "Job has been submitted with JobID" | sed 's/.* //g')

wait_job_running $DATASTREAM_JOB

if [[ $SIMULATE_FAILURE == "true" ]]; then
wait_job_terminal_state $DATASTREAM_JOB FAILED
else
wait_job_running $DATASTREAM_JOB
wait_num_checkpoints $DATASTREAM_JOB 1
wait_oper_metric_num_in_records SemanticsCheckMapper.0 200

Expand Down

0 comments on commit 76afc31

Please sign in to comment.