Skip to content

Commit

Permalink
[FLINK-18086][e2e] Migrate SQLClientKafkaITCase to use DDL and new op…
Browse files Browse the repository at this point in the history
…tions to create tables

This closes apache#12657
  • Loading branch information
wuchong committed Jun 16, 2020
1 parent 7ce45b5 commit 06d96b5
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 258 deletions.
17 changes: 0 additions & 17 deletions flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,6 @@ under the License.
<classifier>sql-jar</classifier>
<scope>test</scope>
</dependency>
<dependency>
<!-- Used by maven-dependency-plugin -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<classifier>sql-jar</classifier>
<scope>test</scope>
</dependency>
<dependency>
<!-- Used by maven-dependency-plugin -->
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -191,15 +183,6 @@ under the License.
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<classifier>sql-jar</classifier>
<destFileName>json.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka-0.10_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.tests.util.kafka;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.tests.util.categories.TravisGroup1;
Expand All @@ -28,11 +29,8 @@
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
import org.apache.flink.tests.util.flink.SQLJobSubmission;
import org.apache.flink.testutils.junit.FailsOnJava11;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;

import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -52,7 +50,6 @@
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -74,20 +71,27 @@ public class SQLClientKafkaITCase extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(SQLClientKafkaITCase.class);

private static final String KAFKA_JSON_SOURCE_SCHEMA_YAML = "kafka_json_source_schema.yaml";
private static final String KAFKA_E2E_SQL = "kafka_e2e.sql";

@Parameterized.Parameters(name = "{index}: kafka-version:{1} kafka-sql-version:{2}")
@Parameterized.Parameters(name = "{index}: kafka-version:{0} kafka-sql-version:{1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{"0.10.2.2", "0.10", ".*kafka-0.10.jar"},
{"0.11.0.2", "0.11", ".*kafka-0.11.jar"},
{"2.4.1", "universal", ".*kafka.jar"}
{"0.10.2.2", "0.10", "kafka-0.10", ".*kafka-0.10.jar"},
{"0.11.0.2", "0.11", "kafka-0.11", ".*kafka-0.11.jar"},
{"2.4.1", "universal", "kafka", ".*kafka.jar"}
});
}

private static Configuration getConfiguration() {
// we have to enable checkpoint to trigger flushing for filesystem sink
final Configuration flinkConfig = new Configuration();
flinkConfig.setString("execution.checkpointing.interval", "5s");
return flinkConfig;
}

@Rule
public final FlinkResource flink = new LocalStandaloneFlinkResourceFactory()
.create(FlinkResourceSetup.builder().build());
.create(FlinkResourceSetup.builder().addConfiguration(getConfiguration()).build());

@Rule
public final KafkaResource kafka;
Expand All @@ -97,22 +101,22 @@ public static Collection<Object[]> data() {

private final String kafkaVersion;
private final String kafkaSQLVersion;
private final String kafkaIdentifier;
private Path result;
private Path sqlClientSessionConf;

@ClassRule
public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();

private static final Path sqlAvroJar = TestUtils.getResourceJar(".*avro.jar");
private static final Path sqlJsonJar = TestUtils.getResourceJar(".*json.jar");
private static final Path sqlToolBoxJar = TestUtils.getResourceJar(".*SqlToolbox.jar");
private final List<Path> apacheAvroJars = new ArrayList<>();
private final Path sqlConnectorKafkaJar;

public SQLClientKafkaITCase(String kafkaVersion, String kafkaSQLVersion, String kafkaSQLJarPattern) {
public SQLClientKafkaITCase(String kafkaVersion, String kafkaSQLVersion, String kafkaIdentifier, String kafkaSQLJarPattern) {
this.kafka = KafkaResource.get(kafkaVersion);
this.kafkaVersion = kafkaVersion;
this.kafkaSQLVersion = kafkaSQLVersion;
this.kafkaIdentifier = kafkaIdentifier;

this.sqlConnectorKafkaJar = TestUtils.getResourceJar(kafkaSQLJarPattern);
}
Expand All @@ -122,7 +126,6 @@ public void before() throws Exception {
DOWNLOAD_CACHE.before();
Path tmpPath = tmp.getRoot().toPath();
LOG.info("The current temporary path: {}", tmpPath);
this.sqlClientSessionConf = tmpPath.resolve("sql-client-session.conf");
this.result = tmpPath.resolve("result");

apacheAvroJars.add(DOWNLOAD_CACHE.getOrDownload("https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar", tmpPath));
Expand All @@ -138,14 +141,14 @@ public void testKafka() throws Exception {
String testAvroTopic = "test-avro-" + kafkaVersion + "-" + UUID.randomUUID().toString();
kafka.createTopic(1, 1, testJsonTopic);
String[] messages = new String[]{
"{\"timestamp\": \"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
"{\"timestamp\": \"2018-03-12T08:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
"{\"timestamp\": \"2018-03-12T09:00:00Z\", \"user\": \"Bob\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is another warning.\"}}",
"{\"timestamp\": \"2018-03-12T09:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"INFO\", \"message\": \"This is a info.\"}}",
"{\"timestamp\": \"2018-03-12T09:20:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is another info.\"}}",
"{\"timestamp\": \"2018-03-12T09:30:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is another info.\"}}",
"{\"timestamp\": \"2018-03-12T09:30:00Z\", \"user\": null, \"event\": { \"type\": \"WARNING\", \"message\": \"This is a bad message because the user is missing.\"}}",
"{\"timestamp\": \"2018-03-12T10:40:00Z\", \"user\": \"Bob\", \"event\": { \"type\": \"ERROR\", \"message\": \"This is an error.\"}}"
"{\"rowtime\": \"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
"{\"rowtime\": \"2018-03-12T08:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
"{\"rowtime\": \"2018-03-12T09:00:00Z\", \"user\": \"Bob\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is another warning.\"}}",
"{\"rowtime\": \"2018-03-12T09:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"INFO\", \"message\": \"This is a info.\"}}",
"{\"rowtime\": \"2018-03-12T09:20:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is another info.\"}}",
"{\"rowtime\": \"2018-03-12T09:30:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is another info.\"}}",
"{\"rowtime\": \"2018-03-12T09:30:00Z\", \"user\": null, \"event\": { \"type\": \"WARNING\", \"message\": \"This is a bad message because the user is missing.\"}}",
"{\"rowtime\": \"2018-03-12T10:40:00Z\", \"user\": \"Bob\", \"event\": { \"type\": \"ERROR\", \"message\": \"This is an error.\"}}"
};
kafka.sendMessages(testJsonTopic, messages);

Expand All @@ -154,23 +157,15 @@ public void testKafka() throws Exception {

// Initialize the SQL client session configuration file
Map<String, String> varsMap = new HashMap<>();
varsMap.put("$TABLE_NAME", "JsonSourceTable");
varsMap.put("$KAFKA_SQL_VERSION", this.kafkaSQLVersion);
varsMap.put("$TOPIC_NAME", testJsonTopic);
varsMap.put("$KAFKA_IDENTIFIER", this.kafkaIdentifier);
varsMap.put("$TOPIC_JSON_NAME", testJsonTopic);
varsMap.put("$TOPIC_AVRO_NAME", testAvroTopic);
varsMap.put("$RESULT", this.result.toAbsolutePath().toString());
varsMap.put("$KAFKA_ZOOKEEPER_ADDRESS", kafka.getZookeeperAddress().toString());
varsMap.put("$KAFKA_BOOTSTRAP_SERVERS", StringUtils.join(kafka.getBootstrapServerAddresses().toArray(), ","));
String schemaContent = initializeSessionYaml(varsMap);
Files.write(this.sqlClientSessionConf,
schemaContent.getBytes(Charsets.UTF_8),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE);

// Executing SQL, redirect the data from Kafka JSON to Kafka Avro.
insertIntoAvroTable(clusterController);
List<String> sqlLines = initializeSqlLines(varsMap);

// Executing SQL, redirect the data from Kafka Avro to CSV sink.
insertIntoCsvSinkTable(clusterController);
// execute sql statements in "kafka_e2e.sql" file
executeSqlStatements(clusterController, sqlLines);

// Wait until all the results flushed to the CSV file.
LOG.info("Verify the CSV result.");
Expand All @@ -179,72 +174,44 @@ public void testKafka() throws Exception {
}
}

private void insertIntoAvroTable(ClusterController clusterController) throws IOException {
LOG.info("Executing SQL: Kafka {} JSON -> Kafka {} Avro", kafkaSQLVersion, kafkaSQLVersion);
String sqlStatement1 = "INSERT INTO AvroBothTable\n" +
" SELECT\n" +
" CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,\n" +
" user,\n" +
" RegReplace(event.message, ' is ', ' was ') AS message,\n" +
" COUNT(*) AS duplicate_count\n" +
" FROM JsonSourceTable\n" +
" WHERE user IS NOT NULL\n" +
" GROUP BY\n" +
" user,\n" +
" event.message,\n" +
" TUMBLE(rowtime, INTERVAL '1' HOUR)";

clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlStatement1)
.addJar(sqlAvroJar)
.addJars(apacheAvroJars)
.addJar(sqlJsonJar)
.addJar(sqlConnectorKafkaJar)
.addJar(sqlToolBoxJar)
.setSessionEnvFile(this.sqlClientSessionConf.toAbsolutePath().toString())
.build());
}

private void insertIntoCsvSinkTable(ClusterController clusterController) throws IOException {
LOG.info("Executing SQL: Kafka {} Avro -> Csv sink", kafkaSQLVersion);
String sqlStatement2 = "INSERT INTO CsvSinkTable\n" +
" SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant\n" +
" FROM AvroBothTable";

clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlStatement2)
.addJar(sqlAvroJar)
.addJars(apacheAvroJars)
.addJar(sqlJsonJar)
.addJar(sqlConnectorKafkaJar)
.addJar(sqlToolBoxJar)
.setSessionEnvFile(this.sqlClientSessionConf.toAbsolutePath().toString())
.build()
);
private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) throws IOException {
LOG.info("Executing Kafka {} end-to-end SQL statements.", kafkaSQLVersion);
clusterController.submitSQLJob(new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJar(sqlAvroJar)
.addJars(apacheAvroJars)
.addJar(sqlConnectorKafkaJar)
.addJar(sqlToolBoxJar)
.build());
}

private String initializeSessionYaml(Map<String, String> vars) throws IOException {
URL url = SQLClientKafkaITCase.class.getClassLoader().getResource(KAFKA_JSON_SOURCE_SCHEMA_YAML);
private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {
URL url = SQLClientKafkaITCase.class.getClassLoader().getResource(KAFKA_E2E_SQL);
if (url == null) {
throw new FileNotFoundException(KAFKA_JSON_SOURCE_SCHEMA_YAML);
throw new FileNotFoundException(KAFKA_E2E_SQL);
}

String schema = FileUtils.readFileUtf8(new File(url.getFile()));
for (Map.Entry<String, String> var : vars.entrySet()) {
schema = schema.replace(var.getKey(), var.getValue());
List<String> lines = Files.readAllLines(new File(url.getFile()).toPath());
List<String> result = new ArrayList<>();
for (String line : lines) {
for (Map.Entry<String, String> var : vars.entrySet()) {
line = line.replace(var.getKey(), var.getValue());
}
result.add(line);
}
return schema;

return result;
}

private void checkCsvResultFile() throws Exception {
boolean success = false;
final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120));
while (!success && deadline.hasTimeLeft()) {
while (deadline.hasTimeLeft()) {
if (Files.exists(result)) {
byte[] bytes = Files.readAllBytes(result);
String[] lines = new String(bytes, Charsets.UTF_8).split("\n");
if (lines.length == 4) {
List<String> lines = readCsvResultFiles(result);
if (lines.size() == 4) {
success = true;
assertThat(
lines,
lines.toArray(new String[0]),
arrayContainingInAnyOrder(
"2018-03-12 08:00:00.000,Alice,This was a warning.,2,Success constant folding.",
"2018-03-12 09:00:00.000,Bob,This was another warning.,1,Success constant folding.",
Expand All @@ -253,6 +220,9 @@ private void checkCsvResultFile() throws Exception {
)
);
break;
} else {
LOG.info("The target CSV {} does not contain enough records, current {} records, left time: {}s",
result, lines.size(), deadline.timeLeft().getSeconds());
}
} else {
LOG.info("The target CSV {} does not exist now", result);
Expand All @@ -261,4 +231,17 @@ private void checkCsvResultFile() throws Exception {
}
Assert.assertTrue("Did not get expected results before timeout.", success);
}

private static List<String> readCsvResultFiles(Path path) throws IOException {
File filePath = path.toFile();
// list all the non-hidden files
File[] csvFiles = filePath.listFiles((dir, name) -> !name.startsWith("."));
List<String> result = new ArrayList<>();
if (csvFiles != null) {
for (File file : csvFiles) {
result.addAll(Files.readAllLines(file.toPath()));
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http:https://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

CREATE TABLE JsonTable (
rowtime TIMESTAMP(3),
`user` STRING,
event ROW<`type` STRING, message STRING>,
WATERMARK FOR rowtime AS rowtime - INTERVAL '2' SECOND
) WITH (
'connector' = '$KAFKA_IDENTIFIER',
'topic' = '$TOPIC_JSON_NAME',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '$KAFKA_BOOTSTRAP_SERVERS',
'format' = 'json'
);

CREATE TABLE AvroTable (
event_timestamp STRING,
`user` STRING,
message STRING,
duplicate_count BIGINT
) WITH (
'connector' = '$KAFKA_IDENTIFIER',
'topic' = '$TOPIC_AVRO_NAME',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '$KAFKA_BOOTSTRAP_SERVERS',
'format' = 'avro'
);

CREATE TABLE CsvTable (
event_timestamp STRING,
`user` STRING,
message STRING,
duplicate_count BIGINT,
constant STRING
) WITH (
'connector' = 'filesystem',
'path' = '$RESULT',
'sink.rolling-policy.rollover-interval' = '2s',
'sink.rolling-policy.check-interval' = '2s',
'format' = 'csv',
'csv.disable-quote-character' = 'true'
);

CREATE FUNCTION RegReplace AS 'org.apache.flink.table.toolbox.StringRegexReplaceFunction';

INSERT INTO AvroTable -- read from Kafka Json, window aggregation, and write into Kafka Avro
SELECT
CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,
`user`,
RegReplace(event.message, ' is ', ' was ') AS message,
COUNT(*) AS duplicate_count
FROM JsonTable
WHERE `user` IS NOT NULL
GROUP BY
`user`,
event.message,
TUMBLE(rowtime, INTERVAL '1' HOUR);

INSERT INTO CsvTable -- read from Kafka Avro, and write into Filesystem Csv
SELECT AvroTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
FROM AvroTable;
Loading

0 comments on commit 06d96b5

Please sign in to comment.