Skip to content

Commit

Permalink
[FLINK-8989] [e2e] Cleanup / improve Elasticsearch e2e tests
Browse files Browse the repository at this point in the history
- Rework e2e test job modules to have correct Maven POM
- Parameterize num of records to write to Elasticsearch
- Parameterize Elasticsearch download URL and version in test script
- Improve robustness of test
- Move more Elasticsearch functionality to elasticsearch-common.sh

This closes apache#5761.
  • Loading branch information
tzulitai committed May 22, 2018
1 parent d5bb60d commit 71095dc
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 305 deletions.
45 changes: 10 additions & 35 deletions flink-end-to-end-tests/flink-elasticsearch1-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ under the License.
xmlns:xsi="http:https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http:https://maven.apache.org/POM/4.0.0 http:https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>flink-end-to-end-tests</artifactId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-end-to-end-tests</artifactId>
<version>1.6-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>flink-elasticsearch1-test_${scala.binary.version}</artifactId>
<artifactId>flink-elasticsearch1-test</artifactId>
<name>flink-elasticsearch1-test</name>
<packaging>jar</packaging>

Expand All @@ -41,7 +41,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch_${scala.binary.version}</artifactId>
Expand All @@ -56,26 +55,18 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Elasticsearch1Sink end to end example -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
<finalName>Elasticsearch1SinkExample</finalName>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
Expand All @@ -86,27 +77,11 @@ under the License.
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>

<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch1.sh scripts-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>rename</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy file="${project.basedir}/target/flink-elasticsearch1-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch1SinkExample.jar" />
</target>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.streaming.tests.Elasticsearch1SinkExample</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,28 @@
* End to end test for Elasticsearch1Sink.
*/
public class Elasticsearch1SinkExample {

public static void main(String[] args) throws Exception {

final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 2) {
if (parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!\n" +
"Usage: --index <index> --type <type>");
"Usage: --numRecords <numRecords> --index <index> --type <type>");
return;
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);

DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "message # " + value;
}
});
DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "message # " + value;
}
});

Map<String, String> userConfig = new HashMap<>();
userConfig.put("cluster.name", "elasticsearch");
Expand Down
65 changes: 11 additions & 54 deletions flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ under the License.
<project xmlns="http:https://maven.apache.org/POM/4.0.0"
xmlns:xsi="http:https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http:https://maven.apache.org/POM/4.0.0 http:https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>flink-end-to-end-tests</artifactId>
<groupId>org.apache.flink</groupId>
<artifactId>flink-end-to-end-tests</artifactId>
<version>1.6-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>flink-elasticsearch2-test_${scala.binary.version}</artifactId>
<artifactId>flink-elasticsearch2-test</artifactId>
<name>flink-elasticsearch2-test</name>
<packaging>jar</packaging>

Expand All @@ -40,31 +41,11 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<!-- Remove elasticsearch1.7.1 -->
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.3.5</version>
</dependency>
</dependencies>

<build>
Expand All @@ -74,26 +55,18 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Elasticsearch2Sink end to end example -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
<finalName>Elasticsearch2SinkExample</finalName>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
Expand All @@ -104,27 +77,11 @@ under the License.
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>

<!--simplify the name of the testing JAR for referring to it in test_streaming_elasticsearch2.sh scripts-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>rename</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy file="${project.basedir}/target/flink-elasticsearch2-test_${scala.binary.version}-${project.version}.jar" tofile="${project.basedir}/target/Elasticsearch2SinkExample.jar" />
</target>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.streaming.tests.Elasticsearch2SinkExample</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,23 @@ public static void main(String[] args) throws Exception {

final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 2) {
if (parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!\n" +
"Usage: --index <index> --type <type>");
"Usage: --numRecords --index <index> --type <type>");
return;
}

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.enableCheckpointing(5000);

DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "message #" + value;
}
});
DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "message #" + value;
}
});

Map<String, String> userConfig = new HashMap<>();
userConfig.put("cluster.name", "elasticsearch");
Expand Down
Loading

0 comments on commit 71095dc

Please sign in to comment.