Skip to content

Commit

Permalink
[FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink
Browse files Browse the repository at this point in the history
This closes apache#6927.
  • Loading branch information
yanghua authored and pnowojski committed Nov 15, 2018
1 parent 6f9aa83 commit 4986961
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 200 deletions.
17 changes: 16 additions & 1 deletion flink-end-to-end-tests/flink-sql-client-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ under the License.
<classifier>sql-jar</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<!-- Used by maven-dependency-plugin -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>sql-jar</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<!-- Used by maven-dependency-plugin -->
<groupId>org.apache.flink</groupId>
Expand All @@ -106,7 +114,7 @@ under the License.
as we neither access nor package the kafka dependencies -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
<version>2.0.0</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down Expand Up @@ -176,6 +184,13 @@ under the License.
<classifier>sql-jar</classifier>
<type>jar</type>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>sql-jar</classifier>
<type>jar</type>
</artifactItem>
<!-- This SQL JAR is not used for now to avoid dependency conflicts; see FLINK-10107.
<artifactItem>
<groupId>org.apache.flink</groupId>
Expand Down
2 changes: 2 additions & 0 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/
run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"

run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"

run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh"

Expand Down
101 changes: 101 additions & 0 deletions flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

KAFKA_CONNECTOR_VERSION="$1"
KAFKA_VERSION="$2"
CONFLUENT_VERSION="$3"
CONFLUENT_MAJOR_VERSION="$4"

source "$(dirname "$0")"/kafka-common.sh $2 $3 $4

function create_kafka_json_source {
topicName="$1"
create_kafka_topic 1 1 $topicName

# put JSON data into Kafka
echo "Sending messages to Kafka..."

send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' $topicName
}

function get_kafka_json_source_schema {
topicName="$1"
tableName="$2"
cat << EOF
- name: $tableName
type: source-table
update-mode: append
schema:
- name: rowtime
type: TIMESTAMP
rowtime:
timestamps:
type: from-field
from: timestamp
watermarks:
type: periodic-bounded
delay: 2000
- name: user
type: VARCHAR
- name: event
type: ROW<type VARCHAR, message VARCHAR>
connector:
type: kafka
version: "$KAFKA_CONNECTOR_VERSION"
topic: $topicName
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
format:
type: json
json-schema: >
{
"type": "object",
"properties": {
"timestamp": {
"type": "string"
},
"user": {
"type": ["string", "null"]
},
"event": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"message": {
"type": "string"
}
}
}
}
}
EOF
}
Loading

0 comments on commit 4986961

Please sign in to comment.