From 71095dcb098c5b03a656a1f3bb48634294e537bb Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Tue, 22 May 2018 15:10:32 +0800 Subject: [PATCH] [FLINK-8989] [e2e] Cleanup / improve Elasticsearch e2e tests - Rework e2e test job modules to have correct Maven POM - Parameterize num of records to write to Elasticsearch - Parameterize Elasticsearch download URL and version in test script - Improve robustness of test - Move more Elasticsearch functionality to elasticsearch-common.sh This closes #5761. --- .../flink-elasticsearch1-test/pom.xml | 45 ++------ .../tests/Elasticsearch1SinkExample.java | 18 +-- .../flink-elasticsearch2-test/pom.xml | 65 ++--------- .../tests/Elasticsearch2SinkExample.java | 17 +-- .../flink-elasticsearch5-test/pom.xml | 78 ++----------- .../tests/Elasticsearch5SinkExample.java | 18 +-- flink-end-to-end-tests/run-nightly-tests.sh | 23 +++- .../test-scripts/elasticsearch-common.sh | 48 +++++--- .../test_streaming_elasticsearch.sh | 51 ++++++++ .../test_streaming_elasticsearch125.sh | 109 ------------------ 10 files changed, 167 insertions(+), 305 deletions(-) create mode 100755 flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh delete mode 100755 flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml index b983e72793b65..7c46ae135c0a5 100644 --- a/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml +++ b/flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml @@ -21,16 +21,16 @@ under the License. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + - flink-end-to-end-tests org.apache.flink + flink-end-to-end-tests 1.6-SNAPSHOT .. - 4.0.0 - - flink-elasticsearch1-test_${scala.binary.version} + flink-elasticsearch1-test flink-elasticsearch1-test jar @@ -41,7 +41,6 @@ under the License. ${project.version} provided - org.apache.flink flink-connector-elasticsearch_${scala.binary.version} @@ -56,26 +55,18 @@ under the License. maven-shade-plugin 3.0.0 - package shade - true + Elasticsearch1SinkExample com.google.code.findbugs:jsr305 - org.slf4j:* - log4j:* - - - org.apache.flink.streaming.tests.Elasticsearch1SinkExample - - *:* @@ -86,27 +77,11 @@ under the License. - - - - - - - - org.apache.maven.plugins - maven-antrun-plugin - 1.7 - - - rename - package - - run - - - - - + + + org.apache.flink.streaming.tests.Elasticsearch1SinkExample + + diff --git a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java index bfdb8066e56d4..18fa05a8976b5 100644 --- a/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java +++ b/flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch1SinkExample.java @@ -41,13 +41,14 @@ * End to end test for Elasticsearch1Sink. */ public class Elasticsearch1SinkExample { + public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); - if (parameterTool.getNumberOfParameters() < 2) { + if (parameterTool.getNumberOfParameters() < 3) { System.out.println("Missing parameters!\n" + - "Usage: --index --type "); + "Usage: --numRecords --index --type "); return; } @@ -55,12 +56,13 @@ public static void main(String[] args) throws Exception { env.getConfig().disableSysoutLogging(); env.enableCheckpointing(5000); - DataStream source = env.generateSequence(0, 20).map(new MapFunction() { - @Override - public String map(Long value) throws Exception { - return "message # " + value; - } - }); + DataStream source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1) + .map(new MapFunction() { + @Override + public String map(Long value) throws Exception { + return "message # " + value; + } + }); Map userConfig = new HashMap<>(); userConfig.put("cluster.name", "elasticsearch"); diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml index 178d63276740f..4997910264451 100644 --- a/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml +++ b/flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml @@ -20,16 +20,17 @@ under the License. + + 4.0.0 + - flink-end-to-end-tests org.apache.flink + flink-end-to-end-tests 1.6-SNAPSHOT .. - 4.0.0 - - flink-elasticsearch2-test_${scala.binary.version} + flink-elasticsearch2-test flink-elasticsearch2-test jar @@ -40,31 +41,11 @@ under the License. ${project.version} provided - org.apache.flink flink-connector-elasticsearch2_${scala.binary.version} ${project.version} - - - org.apache.flink - flink-connector-elasticsearch-base_${scala.binary.version} - ${project.version} - - - - org.elasticsearch - elasticsearch - - - - - - org.elasticsearch - elasticsearch - 2.3.5 - @@ -74,26 +55,18 @@ under the License. maven-shade-plugin 3.0.0 - package shade - true + Elasticsearch2SinkExample com.google.code.findbugs:jsr305 - org.slf4j:* - log4j:* - - - org.apache.flink.streaming.tests.Elasticsearch2SinkExample - - *:* @@ -104,27 +77,11 @@ under the License. - - - - - - - - org.apache.maven.plugins - maven-antrun-plugin - 1.7 - - - rename - package - - run - - - - - + + + org.apache.flink.streaming.tests.Elasticsearch2SinkExample + + diff --git a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java index 4ec03aa077281..f7532b1a8d6bb 100644 --- a/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java +++ b/flink-end-to-end-tests/flink-elasticsearch2-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch2SinkExample.java @@ -44,9 +44,9 @@ public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); - if (parameterTool.getNumberOfParameters() < 2) { + if (parameterTool.getNumberOfParameters() < 3) { System.out.println("Missing parameters!\n" + - "Usage: --index --type "); + "Usage: --numRecords --index --type "); return; } @@ -54,12 +54,13 @@ public static void main(String[] args) throws Exception { env.getConfig().disableSysoutLogging(); env.enableCheckpointing(5000); - DataStream source = env.generateSequence(0, 20).map(new MapFunction() { - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); + DataStream source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1) + .map(new MapFunction() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); Map userConfig = new HashMap<>(); userConfig.put("cluster.name", "elasticsearch"); diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml index 5b03a7f9d18b9..05a621f5d8a14 100644 --- a/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml +++ b/flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml @@ -20,15 +20,17 @@ under the License. + + 4.0.0 + - flink-end-to-end-tests org.apache.flink + flink-end-to-end-tests 1.6-SNAPSHOT .. - 4.0.0 - flink-elasticsearch5-test_${scala.binary.version} + flink-elasticsearch5-test flink-elasticsearch5-test jar @@ -39,45 +41,11 @@ under the License. ${project.version} provided - org.apache.flink flink-connector-elasticsearch5_${scala.binary.version} ${project.version} - - - org.apache.flink - flink-connector-elasticsearch-base_${scala.binary.version} - ${project.version} - - - - org.elasticsearch - elasticsearch - - - - - - - org.elasticsearch.client - transport - 5.1.2 - - - - - - org.apache.logging.log4j - log4j-to-slf4j - 2.7 - @@ -87,26 +55,18 @@ under the License. maven-shade-plugin 3.0.0 - package shade - true + Elasticsearch5SinkExample com.google.code.findbugs:jsr305 - org.slf4j:* - log4j:* - - - org.apache.flink.streaming.tests.Elasticsearch5SinkExample - - *:* @@ -117,27 +77,11 @@ under the License. - - - - - - - - org.apache.maven.plugins - maven-antrun-plugin - 1.7 - - - rename - package - - run - - - - - + + + org.apache.flink.streaming.tests.Elasticsearch5SinkExample + + diff --git a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java index 285f9020d7c78..39808f6fd4d8c 100644 --- a/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java +++ b/flink-end-to-end-tests/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java @@ -40,13 +40,14 @@ * End to end test for Elasticsearch5Sink. */ public class Elasticsearch5SinkExample { + public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); - if (parameterTool.getNumberOfParameters() < 2) { + if (parameterTool.getNumberOfParameters() < 3) { System.out.println("Missing parameters!\n" + - "Usage: --index --type "); + "Usage: --numRecords --index --type "); return; } @@ -54,12 +55,13 @@ public static void main(String[] args) throws Exception { env.getConfig().disableSysoutLogging(); env.enableCheckpointing(5000); - DataStream source = env.generateSequence(0, 20).map(new MapFunction() { - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); + DataStream source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1) + .map(new MapFunction() { + @Override + public String map(Long value) throws Exception { + return "message #" + value; + } + }); Map userConfig = new HashMap<>(); userConfig.put("cluster.name", "elasticsearch"); diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 289868282ec1a..0ec34926fa4ef 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -158,7 +158,28 @@ if [ $EXIT_CODE == 0 ]; then fi if [ $EXIT_CODE == 0 ]; then - run_test "stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4" + run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test \ + "Elasticsearch (v1.7.1) sink end-to-end test" \ + "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test \ + "Elasticsearch (v2.3.5) sink end-to-end test" \ + "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz" + EXIT_CODE=$? +fi + +if [ $EXIT_CODE == 0 ]; then + run_test \ + "Elasticsearch (v5.1.2) sink end-to-end test" \ + "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz" EXIT_CODE=$? fi diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh index 3fda3444cf7f9..0ef6d558d41a9 100644 --- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh +++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh @@ -20,15 +20,32 @@ set -o pipefail if [[ -z $TEST_DATA_DIR ]]; then - echo "Must run common.sh before kafka-common.sh." + echo "Must run common.sh before elasticsearch-common.sh." exit 1 fi +function setup_elasticsearch { + mkdir -p $TEST_DATA_DIR + + local downloadUrl=$1 + + # start downloading Elasticsearch + echo "Downloading Elasticsearch from $downloadUrl ..." + curl "$downloadUrl" > $TEST_DATA_DIR/elasticsearch.tar.gz + + local elasticsearchDir=$TEST_DATA_DIR/elasticsearch + mkdir -p $elasticsearchDir + tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $elasticsearchDir --strip-components=1 + + # start Elasticsearch cluster + $elasticsearchDir/bin/elasticsearch & +} + function verify_elasticsearch_process_exist { - ELASTICSEARCH_PROCESS=$(jps | grep Elasticsearch | awk '{print $2}') + local elasticsearchProcess=$(jps | grep Elasticsearch | awk '{print $2}') # make sure the elasticsearch node is actually running - if [ "$ELASTICSEARCH_PROCESS" != "Elasticsearch" ]; then + if [ "$elasticsearchProcess" != "Elasticsearch" ]; then echo "Elasticsearch node is not running." PASS="" exit 1 @@ -38,25 +55,26 @@ function verify_elasticsearch_process_exist { } function verify_result { + local numRecords=$1 + if [ -f "$TEST_DATA_DIR/output" ]; then rm $TEST_DATA_DIR/output fi - curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output + while : ; do + curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output - if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then - echo "Elasticsearch end to end test pass." - else - echo "Elasticsearch end to end test failed." - PASS="" - exit 1 - fi + if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then + echo "Elasticsearch end to end test pass." + break + else + echo "Waiting for Elasticsearch records ..." + sleep 1 + fi + done } function shutdown_elasticsearch_cluster { pid=$(jps | grep Elasticsearch | awk '{print $1}') - kill -SIGTERM $pid - - # make sure to run regular cleanup as well - cleanup + kill -9 $pid } diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh new file mode 100755 index 0000000000000..78ea283028c35 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/elasticsearch-common.sh + +ELASTICSEARCH_VERSION=$1 +DOWNLOAD_URL=$2 + +mkdir -p $TEST_DATA_DIR + +setup_elasticsearch $DOWNLOAD_URL +verify_elasticsearch_process_exist + +start_cluster + +function test_cleanup { + shutdown_elasticsearch_cluster + + # make sure to run regular cleanup as well + cleanup +} + +trap test_cleanup INT +trap test_cleanup EXIT + +TEST_ES_JAR=$TEST_DATA_DIR/../../flink-elasticsearch${ELASTICSEARCH_VERSION}-test/target/Elasticsearch${ELASTICSEARCH_VERSION}SinkExample.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \ + --numRecords 20 \ + --index index \ + --type type + +verify_result 20 diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh deleted file mode 100755 index dea3f13103d31..0000000000000 --- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch125.sh +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/elasticsearch-common.sh - -mkdir -p $TEST_DATA_DIR - -ELASTICSEARCH1_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz" -ELASTICSEARCH2_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz" -ELASTICSEARCH5_URL="https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz" - -# start downloading elasticsearch1 -echo "Downloading Elasticsearch1 from $ELASTICSEARCH1_URL" -curl "$ELASTICSEARCH1_URL" > $TEST_DATA_DIR/elasticsearch1.tar.gz - -tar xzf $TEST_DATA_DIR/elasticsearch1.tar.gz -C $TEST_DATA_DIR/ -ELASTICSEARCH1_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1 - -# start elasticsearch1 cluster -$ELASTICSEARCH1_DIR/bin/elasticsearch -daemon - -verify_elasticsearch_process_exist - -start_cluster - -TEST_ES1_JAR=$TEST_DATA_DIR/../../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar - -# run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES1_JAR \ - --index index \ - --type type - -verify_result - -shutdown_elasticsearch_cluster - -mkdir -p $TEST_DATA_DIR - -# start downloading elasticsearch2 -echo "Downloading Elasticsearch2 from $ELASTICSEARCH2_URL" -curl "$ELASTICSEARCH2_URL" > $TEST_DATA_DIR/elasticsearch2.tar.gz - -tar xzf $TEST_DATA_DIR/elasticsearch2.tar.gz -C $TEST_DATA_DIR/ -ELASTICSEARCH2_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 - -# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell. -nohup $ELASTICSEARCH2_DIR/bin/elasticsearch & - -verify_elasticsearch_process_exist - -start_cluster - -TEST_ES2_JAR=$TEST_DATA_DIR/../../flink-elasticsearch2-test/target/Elasticsearch2SinkExample.jar - -# run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES2_JAR \ - --index index \ - --type type - -verify_result - -shutdown_elasticsearch_cluster - -mkdir -p $TEST_DATA_DIR - -# start downloading elasticsearch5 -echo "Downloading Elasticsearch5 from $ELASTICSEARCH5_URL" -curl "$ELASTICSEARCH5_URL" > $TEST_DATA_DIR/elasticsearch5.tar.gz - -tar xzf $TEST_DATA_DIR/elasticsearch5.tar.gz -C $TEST_DATA_DIR/ -ELASTICSEARCH5_DIR=$TEST_DATA_DIR/elasticsearch-5.1.2 - -# start elasticsearch cluster, different from elasticsearch1 since using -daemon here will hang the shell. -nohup $ELASTICSEARCH5_DIR/bin/elasticsearch & - -verify_elasticsearch_process_exist - -start_cluster - -TEST_ES5_JAR=$TEST_DATA_DIR/../../flink-elasticsearch5-test/target/Elasticsearch5SinkExample.jar - -# run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES5_JAR \ - --index index \ - --type type - -verify_result - -rm -rf $FLINK_DIR/log/* 2> /dev/null - -trap shutdown_elasticsearch_cluster INT -trap shutdown_elasticsearch_cluster EXIT