Skip to content

Commit

Permalink
[FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundE…
Browse files Browse the repository at this point in the history
…xception when using flink-sql-connector-elasticsearch7

We shouldn't `exclude org.elasticsearch:elasticsearch-geo` and `org.elasticsearch.plugin:lang-mustache-client` when shading.

This closes apache#11396
  • Loading branch information
leonardBang committed Mar 25, 2020
1 parent f6312b8 commit 1827e4d
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 31 deletions.
16 changes: 8 additions & 8 deletions flink-connectors/flink-sql-connector-elasticsearch7/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,10 @@ under the License.
</includes>
<excludes>
<!-- These dependencies are not required. -->
<exclude>com.carrotsearch:hppc</exclude>
<exclude>com.tdunning:t-digest</exclude>
<exclude>joda-time:joda-time</exclude>
<exclude>net.sf.jopt-simple:jopt-simple</exclude>
<exclude>org.elasticsearch:jna</exclude>
<exclude>org.elasticsearch:elasticsearch-geo</exclude>
<exclude>org.elasticsearch.plugin:lang-mustache-client</exclude>
<exclude>com.github.spullara.mustache.java:compiler</exclude>
<exclude>org.hdrhistogram:HdrHistogram</exclude>
<exclude>org.yaml:snakeyaml</exclude>
</excludes>
Expand Down Expand Up @@ -135,14 +131,18 @@ under the License.
<pattern>org.elasticsearch</pattern>
<shadedPattern>org.apache.flink.elasticsearch7.shaded.org.elasticsearch</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.logging</pattern>
<shadedPattern>org.apache.flink.elasticsearch7.shaded.org.apache.logging</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.flink.elasticsearch7.shaded.com.fasterxml.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>com.carrotsearch.hppc</pattern>
<shadedPattern>org.apache.flink.elasticsearch7.shaded.com.carrotsearch.hppc</shadedPattern>
</relocation>
<relocation>
<pattern>com.github.mustachejava</pattern>
<shadedPattern>org.apache.flink.elasticsearch7.shaded.com.github.mustachejava</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
1 change: 1 addition & 0 deletions flink-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ under the License.
</activation>
<modules>
<module>flink-sql-connector-elasticsearch6</module>
<module>flink-sql-connector-elasticsearch7</module>
<module>flink-sql-connector-kafka-0.10</module>
<module>flink-sql-connector-kafka-0.11</module>
<module>flink-sql-connector-kafka</module>
Expand Down
13 changes: 13 additions & 0 deletions flink-end-to-end-tests/flink-sql-client-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<!-- Used by maven-dependency-plugin -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down Expand Up @@ -191,6 +198,12 @@ under the License.
<version>${project.version}</version>
<type>jar</type>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>jar</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down
6 changes: 4 additions & 2 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ fi
run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" "skip_check_exceptions"
run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions"

run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 6"
run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 7"
run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 6"
run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 7"

run_test "TPC-H end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpch.sh"
run_test "TPC-DS end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_tpcds.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ function verify_result_line_number {
while : ; do
curl "localhost:9200/${index}/_search?q=*&pretty&size=21" > $TEST_DATA_DIR/output || true

if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ] || [ -n "$(grep "\"value\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
echo "Elasticsearch end to end test pass."
break
else
Expand Down
61 changes: 43 additions & 18 deletions flink-end-to-end-tests/test-scripts/test_sql_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
set -Eeuo pipefail

PLANNER="${1:-old}"
ELASTICSEARCH_VERSION=${2:-6}

KAFKA_VERSION="2.2.0"
CONFLUENT_VERSION="5.0.0"
CONFLUENT_MAJOR_VERSION="5.0"
KAFKA_SQL_VERSION="universal"

ELASTICSEARCH6_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
ELASTICSEARCH7_MAC_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-darwin-x86_64.tar.gz'
ELASTICSEARCH7_LINUX_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-linux-x86_64.tar.gz'

source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka_sql_common.sh \
$KAFKA_VERSION \
Expand Down Expand Up @@ -85,6 +90,30 @@ function sql_cleanup() {
}
on_exit sql_cleanup

function prepare_elasticsearch {
echo "Preparing Elasticsearch(version=$ELASTICSEARCH_VERSION)..."
# elastcisearch offers different release binary file for corresponding system since version 7.
case "$(uname -s)" in
Linux*) OS_TYPE=linux;;
Darwin*) OS_TYPE=mac;;
*) OS_TYPE="UNKNOWN:${unameOut}"
esac

if [[ "$ELASTICSEARCH_VERSION" == 6 ]]; then
DOWNLOAD_URL=$ELASTICSEARCH6_DOWNLOAD_URL
elif [[ "$ELASTICSEARCH_VERSION" == 7 ]] && [[ "$OS_TYPE" == "mac" ]]; then
DOWNLOAD_URL=$ELASTICSEARCH7_MAC_DOWNLOAD_URL
elif [[ "$ELASTICSEARCH_VERSION" == 7 ]] && [[ "$OS_TYPE" == "linux" ]]; then
DOWNLOAD_URL=$ELASTICSEARCH7_LINUX_DOWNLOAD_URL
else
echo "[ERROR] Unsupported elasticsearch version($ELASTICSEARCH_VERSION) for OS: $OS_TYPE"
exit 1
fi

setup_elasticsearch $DOWNLOAD_URL
wait_elasticsearch_working
}

# prepare Kafka
echo "Preparing Kafka..."

Expand All @@ -96,13 +125,7 @@ create_kafka_json_source test-json
create_kafka_topic 1 1 test-avro

# prepare Elasticsearch
echo "Preparing Elasticsearch..."

ELASTICSEARCH_VERSION=6
DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'

setup_elasticsearch $DOWNLOAD_URL $ELASTICSEARCH_VERSION
wait_elasticsearch_working
prepare_elasticsearch

################################################################################
# Prepare Flink
Expand All @@ -121,7 +144,7 @@ echo "Testing SQL statements..."

JSON_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "json" )
KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka_" )
ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch6" )
ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch$ELASTICSEARCH_VERSION" )

# create session environment file
RESULT=$TEST_DATA_DIR/result
Expand All @@ -146,7 +169,7 @@ cat >> $SQL_CONF << EOF
data-type: BIGINT
connector:
type: elasticsearch
version: 6
version: "$ELASTICSEARCH_VERSION"
hosts: "https://localhost:9200"
index: "$ELASTICSEARCH_INDEX"
document-type: "user"
Expand All @@ -167,7 +190,7 @@ cat >> $SQL_CONF << EOF
data-type: BIGINT
connector:
type: elasticsearch
version: 6
version: "$ELASTICSEARCH_VERSION"
hosts: "https://localhost:9200"
index: "$ELASTICSEARCH_INDEX"
document-type: "user"
Expand All @@ -190,7 +213,7 @@ EOF

echo "Executing SQL: Values -> Elasticsearch (upsert)"

SQL_STATEMENT_3=$(cat << EOF
SQL_STATEMENT_1=$(cat << EOF
INSERT INTO ElasticsearchUpsertSinkTable
SELECT user_id, user_name, COUNT(*) AS user_count
FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob'))
Expand All @@ -200,18 +223,20 @@ EOF
)

JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--library $SQL_JARS_DIR \
--jar $KAFKA_SQL_JAR \
--jar $JSON_SQL_JAR \
--jar $ELASTICSEARCH_SQL_JAR \
--jar $SQL_TOOLBOX_JAR \
--environment $SQL_CONF \
--update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g')
--update "$SQL_STATEMENT_1" | grep "Job ID:" | sed 's/.* //g')

wait_job_terminal_state "$JOB_ID" "FINISHED"

verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 "21a76360e2a40f442816d940e7071ccf"
verify_result_line_number 3 "$ELASTICSEARCH_INDEX"

echo "Executing SQL: Values -> Elasticsearch (append, no key)"

SQL_STATEMENT_4=$(cat << EOF
SQL_STATEMENT_2=$(cat << EOF
INSERT INTO ElasticsearchAppendSinkTable
SELECT *
FROM (
Expand All @@ -232,7 +257,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--jar $ELASTICSEARCH_SQL_JAR \
--jar $SQL_TOOLBOX_JAR \
--environment $SQL_CONF \
--update "$SQL_STATEMENT_4" | grep "Job ID:" | sed 's/.* //g')
--update "$SQL_STATEMENT_2" | grep "Job ID:" | sed 's/.* //g')

wait_job_terminal_state "$JOB_ID" "FINISHED"

Expand All @@ -241,7 +266,7 @@ verify_result_line_number 9 "$ELASTICSEARCH_INDEX"

echo "Executing SQL: Match recognize -> Elasticsearch"

SQL_STATEMENT_5=$(cat << EOF
SQL_STATEMENT_3=$(cat << EOF
INSERT INTO ElasticsearchAppendSinkTable
SELECT 1 as user_id, T.userName as user_name, cast(1 as BIGINT) as user_count
FROM (
Expand All @@ -265,7 +290,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--jar $ELASTICSEARCH_SQL_JAR \
--jar $SQL_TOOLBOX_JAR \
--environment $SQL_CONF \
--update "$SQL_STATEMENT_5" | grep "Job ID:" | sed 's/.* //g')
--update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g')

# 3 upsert results and 6 append results and 3 match_recognize results
verify_result_line_number 12 "$ELASTICSEARCH_INDEX"
6 changes: 4 additions & 2 deletions tools/travis/splits/split_misc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ fi
run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" "skip_check_exceptions"
run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions"

run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 6"
run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 7"
run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 6"
run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 7"

run_test "Dependency shading of table modules test" "$END_TO_END_DIR/test-scripts/test_table_shaded_dependencies.sh"

Expand Down

0 comments on commit 1827e4d

Please sign in to comment.