Skip to content

Commit

Permalink
[FLINK-10805] [e2e] Fix failing end-to-end tests
Browse files Browse the repository at this point in the history
Fix test_confluent_schema_registry.sh and test_sql_client.sh end-to-end tests which failed
because of missing arguments for kafka-common.sh and the newly introduce set -e flag.

This closes apache#7034.
  • Loading branch information
tillrohrmann authored and twalthr committed Nov 7, 2018
1 parent 1a0a005 commit 8d10698
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ function wait_job_terminal_state {
echo "Waiting for job ($job) to reach terminal state $terminal_state ..."

while : ; do
N=$(grep -o "Job $job reached globally terminal state $terminal_state" $FLINK_DIR/log/*standalonesession*.log | tail -1)
N=$(grep -o "Job $job reached globally terminal state $terminal_state" $FLINK_DIR/log/*standalonesession*.log | tail -1 || true)

if [[ -z $N ]]; then
sleep 1
Expand Down
6 changes: 3 additions & 3 deletions flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function wait_elasticsearch_working {
echo "Waiting for Elasticsearch node to work..."

for ((i=1;i<=60;i++)); do
curl -XGET 'http:https://localhost:9200'
curl -XGET 'http:https://localhost:9200' || true

# make sure the elasticsearch node is actually working
if [ $? -ne 0 ]; then
Expand All @@ -67,7 +67,7 @@ function verify_result_line_number {
fi

while : ; do
curl "localhost:9200/${index}/_search?q=*&pretty&size=21" > $TEST_DATA_DIR/output
curl "localhost:9200/${index}/_search?q=*&pretty&size=21" > $TEST_DATA_DIR/output || true

if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
echo "Elasticsearch end to end test pass."
Expand All @@ -86,7 +86,7 @@ function verify_result_hash {
local hash=$4

while : ; do
curl "localhost:9200/${index}/_search?q=*&pretty" > $TEST_DATA_DIR/es_output
curl "localhost:9200/${index}/_search?q=*&pretty" > $TEST_DATA_DIR/es_output || true

if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/es_output)" ]; then
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
# limitations under the License.
################################################################################

set -Eeuo pipefail

source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka-common.sh
source "$(dirname "$0")"/kafka-common.sh 0.10.2.0 3.2.0 3.2

function verify_output {
local expected=$(printf $1)
Expand Down
19 changes: 13 additions & 6 deletions flink-end-to-end-tests/test-scripts/test_sql_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
# limitations under the License.
################################################################################

set -Eeuo pipefail

source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka-common.sh
source "$(dirname "$0")"/kafka-common.sh 0.10.2.0 3.2.0 3.2
source "$(dirname "$0")"/elasticsearch-common.sh

SQL_TOOLBOX_JAR=$END_TO_END_DIR/flink-sql-client-test/target/SqlToolbox.jar
Expand Down Expand Up @@ -306,7 +308,7 @@ EOF

echo "Executing SQL: Kafka JSON -> Kafka Avro"

read -r -d '' SQL_STATEMENT_1 << EOF
SQL_STATEMENT_1=$(cat << EOF
INSERT INTO AvroBothTable
SELECT
CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,
Expand All @@ -320,6 +322,7 @@ INSERT INTO AvroBothTable
event.message,
TUMBLE(rowtime, INTERVAL '1' HOUR)
EOF
)

echo "$SQL_STATEMENT_1"

Expand All @@ -331,11 +334,12 @@ $FLINK_DIR/bin/sql-client.sh embedded \

echo "Executing SQL: Kafka Avro -> Filesystem CSV"

read -r -d '' SQL_STATEMENT_2 << EOF
SQL_STATEMENT_2=$(cat << EOF
INSERT INTO CsvSinkTable
SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
FROM AvroBothTable
EOF
)

echo "$SQL_STATEMENT_2"

Expand All @@ -360,13 +364,14 @@ check_result_hash "SQL Client Kafka" $RESULT "0a1bf8bf716069b7269f575f87a802c0"

echo "Executing SQL: Values -> Elasticsearch (upsert)"

read -r -d '' SQL_STATEMENT_3 << EOF
SQL_STATEMENT_3=$(cat << EOF
INSERT INTO ElasticsearchUpsertSinkTable
SELECT user_id, user_name, COUNT(*) AS user_count
FROM (VALUES (1, 'Bob'), (22, 'Alice'), (42, 'Greg'), (42, 'Greg'), (42, 'Greg'), (1, 'Bob'))
AS UserCountTable(user_id, user_name)
GROUP BY user_id, user_name
EOF
)

JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--library $SQL_JARS_DIR \
Expand All @@ -380,7 +385,7 @@ verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 "9

echo "Executing SQL: Values -> Elasticsearch (append, no key)"

read -r -d '' SQL_STATEMENT_4 << EOF
SQL_STATEMENT_4=$(cat << EOF
INSERT INTO ElasticsearchAppendSinkTable
SELECT *
FROM (
Expand All @@ -393,6 +398,7 @@ INSERT INTO ElasticsearchAppendSinkTable
(1, 'Bob', CAST(0 AS BIGINT)))
AS UserCountTable(user_id, user_name, user_count)
EOF
)

JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--library $SQL_JARS_DIR \
Expand All @@ -407,7 +413,7 @@ verify_result_line_number 9 "$ELASTICSEARCH_INDEX"

echo "Executing SQL: Match recognize -> Elasticsearch"

read -r -d '' SQL_STATEMENT_5 << EOF
SQL_STATEMENT_5=$(cat << EOF
INSERT INTO ElasticsearchAppendSinkTable
SELECT 1 as user_id, T.userName as user_name, cast(1 as BIGINT) as user_count
FROM (
Expand All @@ -423,6 +429,7 @@ INSERT INTO ElasticsearchAppendSinkTable
A as user = 'Alice'
) T
EOF
)

JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--library $SQL_JARS_DIR \
Expand Down

0 comments on commit 8d10698

Please sign in to comment.