From 1827e4dddfbac75a533ff2aea2f3e690777a3e5e Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 25 Mar 2020 10:13:41 +0800 Subject: [PATCH] [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7 We shouldn't `exclude org.elasticsearch:elasticsearch-geo` and `org.elasticsearch.plugin:lang-mustache-client` when shading. This closes #11396 --- .../pom.xml | 16 ++--- flink-connectors/pom.xml | 1 + .../flink-sql-client-test/pom.xml | 13 ++++ flink-end-to-end-tests/run-nightly-tests.sh | 6 +- .../test-scripts/elasticsearch-common.sh | 2 +- .../test-scripts/test_sql_client.sh | 61 +++++++++++++------ tools/travis/splits/split_misc.sh | 6 +- 7 files changed, 74 insertions(+), 31 deletions(-) diff --git a/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml b/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml index 0f2da3f87b2e6..d1e289dca6219 100644 --- a/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml +++ b/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml @@ -63,14 +63,10 @@ under the License. - com.carrotsearch:hppc com.tdunning:t-digest joda-time:joda-time net.sf.jopt-simple:jopt-simple org.elasticsearch:jna - org.elasticsearch:elasticsearch-geo - org.elasticsearch.plugin:lang-mustache-client - com.github.spullara.mustache.java:compiler org.hdrhistogram:HdrHistogram org.yaml:snakeyaml @@ -135,14 +131,18 @@ under the License. org.elasticsearch org.apache.flink.elasticsearch7.shaded.org.elasticsearch - - org.apache.logging - org.apache.flink.elasticsearch7.shaded.org.apache.logging - com.fasterxml.jackson org.apache.flink.elasticsearch7.shaded.com.fasterxml.jackson + + com.carrotsearch.hppc + org.apache.flink.elasticsearch7.shaded.com.carrotsearch.hppc + + + com.github.mustachejava + org.apache.flink.elasticsearch7.shaded.com.github.mustachejava + diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 1e92a10541ffd..296bf899e4491 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -94,6 +94,7 @@ under the License. flink-sql-connector-elasticsearch6 + flink-sql-connector-elasticsearch7 flink-sql-connector-kafka-0.10 flink-sql-connector-kafka-0.11 flink-sql-connector-kafka diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index c321c459ed4e5..69dd2b2db4047 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -96,6 +96,13 @@ under the License. ${project.version} provided + + + org.apache.flink + flink-sql-connector-elasticsearch7_${scala.binary.version} + ${project.version} + provided + @@ -191,6 +198,12 @@ under the License. ${project.version} jar + + org.apache.flink + flink-sql-connector-elasticsearch7_${scala.binary.version} + ${project.version} + jar + diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index d6a691f44d4af..fc854b6dfe374 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -177,8 +177,10 @@ fi run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" "skip_check_exceptions" run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions" -run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old" -run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink" +run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 6" +run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 7" +run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 6" +run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 7" run_test "TPC-H end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpch.sh" run_test "TPC-DS end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpcds.sh" diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh index 0ef278abf98ed..8a2afcb4ffe2f 100644 --- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh +++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh @@ -74,7 +74,7 @@ function verify_result_line_number { while : ; do curl "localhost:9200/${index}/_search?q=*&pretty&size=21" > $TEST_DATA_DIR/output || true - if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then + if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ] || [ -n "$(grep "\"value\" : $numRecords" $TEST_DATA_DIR/output)" ]; then echo "Elasticsearch end to end test pass." break else diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index 8eec6eaf6ed15..54209249b0686 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -20,12 +20,17 @@ set -Eeuo pipefail PLANNER="${1:-old}" +ELASTICSEARCH_VERSION=${2:-6} KAFKA_VERSION="2.2.0" CONFLUENT_VERSION="5.0.0" CONFLUENT_MAJOR_VERSION="5.0" KAFKA_SQL_VERSION="universal" +ELASTICSEARCH6_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz' +ELASTICSEARCH7_MAC_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-darwin-x86_64.tar.gz' +ELASTICSEARCH7_LINUX_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-linux-x86_64.tar.gz' + source "$(dirname "$0")"/common.sh source "$(dirname "$0")"/kafka_sql_common.sh \ $KAFKA_VERSION \ @@ -85,6 +90,30 @@ function sql_cleanup() { } on_exit sql_cleanup +function prepare_elasticsearch { + echo "Preparing Elasticsearch(version=$ELASTICSEARCH_VERSION)..." + # elastcisearch offers different release binary file for corresponding system since version 7. + case "$(uname -s)" in + Linux*) OS_TYPE=linux;; + Darwin*) OS_TYPE=mac;; + *) OS_TYPE="UNKNOWN:${unameOut}" + esac + + if [[ "$ELASTICSEARCH_VERSION" == 6 ]]; then + DOWNLOAD_URL=$ELASTICSEARCH6_DOWNLOAD_URL + elif [[ "$ELASTICSEARCH_VERSION" == 7 ]] && [[ "$OS_TYPE" == "mac" ]]; then + DOWNLOAD_URL=$ELASTICSEARCH7_MAC_DOWNLOAD_URL + elif [[ "$ELASTICSEARCH_VERSION" == 7 ]] && [[ "$OS_TYPE" == "linux" ]]; then + DOWNLOAD_URL=$ELASTICSEARCH7_LINUX_DOWNLOAD_URL + else + echo "[ERROR] Unsupported elasticsearch version($ELASTICSEARCH_VERSION) for OS: $OS_TYPE" + exit 1 + fi + + setup_elasticsearch $DOWNLOAD_URL + wait_elasticsearch_working +} + # prepare Kafka echo "Preparing Kafka..." @@ -96,13 +125,7 @@ create_kafka_json_source test-json create_kafka_topic 1 1 test-avro # prepare Elasticsearch -echo "Preparing Elasticsearch..." - -ELASTICSEARCH_VERSION=6 -DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz' - -setup_elasticsearch $DOWNLOAD_URL $ELASTICSEARCH_VERSION -wait_elasticsearch_working +prepare_elasticsearch ################################################################################ # Prepare Flink @@ -121,7 +144,7 @@ echo "Testing SQL statements..." JSON_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "json" ) KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka_" ) -ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch6" ) +ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch$ELASTICSEARCH_VERSION" ) # create session environment file RESULT=$TEST_DATA_DIR/result @@ -146,7 +169,7 @@ cat >> $SQL_CONF << EOF data-type: BIGINT connector: type: elasticsearch - version: 6 + version: "$ELASTICSEARCH_VERSION" hosts: "http://localhost:9200" index: "$ELASTICSEARCH_INDEX" document-type: "user" @@ -167,7 +190,7 @@ cat >> $SQL_CONF << EOF data-type: BIGINT connector: type: elasticsearch - version: 6 + version: "$ELASTICSEARCH_VERSION" hosts: "http://localhost:9200" index: "$ELASTICSEARCH_INDEX" document-type: "user" @@ -190,7 +213,7 @@ EOF echo "Executing SQL: Values -> Elasticsearch (upsert)" -SQL_STATEMENT_3=$(cat << EOF +SQL_STATEMENT_1=$(cat << EOF INSERT INTO ElasticsearchUpsertSinkTable SELECT user_id, user_name, COUNT(*) AS user_count FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob')) @@ -200,18 +223,20 @@ EOF ) JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \ - --library $SQL_JARS_DIR \ + --jar $KAFKA_SQL_JAR \ + --jar $JSON_SQL_JAR \ + --jar $ELASTICSEARCH_SQL_JAR \ --jar $SQL_TOOLBOX_JAR \ --environment $SQL_CONF \ - --update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g') + --update "$SQL_STATEMENT_1" | grep "Job ID:" | sed 's/.* //g') wait_job_terminal_state "$JOB_ID" "FINISHED" -verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 "21a76360e2a40f442816d940e7071ccf" +verify_result_line_number 3 "$ELASTICSEARCH_INDEX" echo "Executing SQL: Values -> Elasticsearch (append, no key)" -SQL_STATEMENT_4=$(cat << EOF +SQL_STATEMENT_2=$(cat << EOF INSERT INTO ElasticsearchAppendSinkTable SELECT * FROM ( @@ -232,7 +257,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \ --jar $ELASTICSEARCH_SQL_JAR \ --jar $SQL_TOOLBOX_JAR \ --environment $SQL_CONF \ - --update "$SQL_STATEMENT_4" | grep "Job ID:" | sed 's/.* //g') + --update "$SQL_STATEMENT_2" | grep "Job ID:" | sed 's/.* //g') wait_job_terminal_state "$JOB_ID" "FINISHED" @@ -241,7 +266,7 @@ verify_result_line_number 9 "$ELASTICSEARCH_INDEX" echo "Executing SQL: Match recognize -> Elasticsearch" -SQL_STATEMENT_5=$(cat << EOF +SQL_STATEMENT_3=$(cat << EOF INSERT INTO ElasticsearchAppendSinkTable SELECT 1 as user_id, T.userName as user_name, cast(1 as BIGINT) as user_count FROM ( @@ -265,7 +290,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \ --jar $ELASTICSEARCH_SQL_JAR \ --jar $SQL_TOOLBOX_JAR \ --environment $SQL_CONF \ - --update "$SQL_STATEMENT_5" | grep "Job ID:" | sed 's/.* //g') + --update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g') # 3 upsert results and 6 append results and 3 match_recognize results verify_result_line_number 12 "$ELASTICSEARCH_INDEX" diff --git a/tools/travis/splits/split_misc.sh b/tools/travis/splits/split_misc.sh index dfc24949655e1..c671a82c3a096 100755 --- a/tools/travis/splits/split_misc.sh +++ b/tools/travis/splits/split_misc.sh @@ -72,8 +72,10 @@ fi run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" "skip_check_exceptions" run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions" -run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old" -run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink" +run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 6" +run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 7" +run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 6" +run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 7" run_test "Dependency shading of table modules test" "$END_TO_END_DIR/test-scripts/test_table_shaded_dependencies.sh"