Skip to content

Commit

Permalink
Run tests in parallel
Browse files Browse the repository at this point in the history
This uses the forkCount parameter of surefire/failsafe. This also
changes the JVM memory size for the testing VMs so that more can fit
into memory.

For this to work, NepheleMiniCluster must recognize when it is run in
a parallel testing environment and change ports accordingly. Also
AbstractTestBase and TaskTestBase must prefix temp files with the class
name to prevent name clashes.

This closes apache#96
  • Loading branch information
aljoscha authored and StephanEwen committed Aug 18, 2014
1 parent 094af83 commit b2af293
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 60 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ before_script:

install: true

script: "mvn -B $PROFILE clean verify"
# we have to manually set the forkCount because maven thinks that the travis
# machine has 32 cores
script: "mvn -Dflink.forkCount=4 -B $PROFILE clean verify"

# deploy if the first job is successful; should be replaced by an after_all_success if travis finally supports it
after_success:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

public class AvroExternalJarProgramITCase {

private static final int TEST_JM_PORT = 43191;

private static final String JAR_FILE = "target/maven-test-jar.jar";

private static final String TEST_DATA_FILE = "/testdata.avro";
Expand All @@ -49,7 +47,6 @@ public void testExternalProgram() {

try {
testMiniCluster = new NepheleMiniCluster();
testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
testMiniCluster.setTaskManagerNumSlots(4);
testMiniCluster.start();

Expand All @@ -58,7 +55,7 @@ public void testExternalProgram() {

PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });

Client c = new Client(new InetSocketAddress("localhost", TEST_JM_PORT), new Configuration(), program.getUserCodeClassLoader());
Client c = new Client(new InetSocketAddress("localhost", testMiniCluster.getJobManagerRpcPort()), new Configuration(), program.getUserCodeClassLoader());
c.run(program, 4, true);
}
catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,24 @@ public JobClient getJobClient(JobGraph jobGraph) throws Exception {
}

public void start() throws Exception {

String forkNumberString = System.getProperty("forkNumber");
int forkNumber = -1;
try {
forkNumber = Integer.parseInt(forkNumberString);
} catch (NumberFormatException e) {
// running inside and IDE, so the forkNumber property is not properly set
// just ignore
}
if (forkNumber != -1) {
// we are running inside a surefire/failsafe test, determine forkNumber and set
// ports accordingly so that we can have multiple parallel instances

jobManagerRpcPort = 1024 + forkNumber * 300;
taskManagerRpcPort = 1024 + forkNumber * 300 + 100;
taskManagerDataPort = 1024 + forkNumber * 300 + 200;
}

synchronized (startStopLock) {
// set up the global configuration
if (this.configDir != null) {
Expand Down
10 changes: 6 additions & 4 deletions flink-core/src/main/java/org/apache/flink/core/fs/Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -497,16 +497,18 @@ public void write(DataOutputView out) throws IOException {

}

public static String constructTestPath(String folder) {
public static String constructTestPath(Class<?> forClass, String folder) {
// we create test path that depends on class to prevent name clashes when two tests
// create temp files with the same name
String path = System.getProperty("java.io.tmpdir");
if (!(path.endsWith("/") || path.endsWith("\\")) ) {
path += System.getProperty("file.separator");
}
path += folder;
path += (forClass.getName() + "-" + folder);
return path;
}

public static String constructTestURI(String folder) {
return new File(constructTestPath(folder)).toURI().toString();
public static String constructTestURI(Class<?> forClass, String folder) {
return new File(constructTestPath(forClass, folder)).toURI().toString();
}
}
37 changes: 16 additions & 21 deletions flink-java8-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ under the License.
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
Expand All @@ -60,17 +60,17 @@ under the License.

<build>
<plugins>
<plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<source>1.8</source>
<target>1.8</target>
<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
</configuration>
</plugin>
<plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<source>1.8</source>
<target>1.8</target>
<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
Expand All @@ -89,8 +89,6 @@ under the License.
<systemPropertyVariables>
<log.level>WARN</log.level>
</systemPropertyVariables>
<forkMode>once</forkMode>
<argLine>-Xmx1024m</argLine>
</configuration>
</plugin>
<plugin>
Expand All @@ -99,9 +97,6 @@ under the License.
<systemPropertyVariables>
<log.level>WARN</log.level>
</systemPropertyVariables>
<forkMode>always</forkMode>
<threadCount>1</threadCount>
<perCoreThreadCount>false</perCoreThreadCount>
</configuration>
</plugin>
</plugins>
Expand Down
4 changes: 0 additions & 4 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ under the License.
<exclude>**/TestData.java</exclude>
<exclude>**/TestInstanceListener.java</exclude>
</excludes>
<forkMode>once</forkMode>
<argLine>-Xms512m -Xmx1024m</argLine>
<systemPropertyVariables>
<log.level>WARN</log.level>
</systemPropertyVariables>
Expand All @@ -118,8 +116,6 @@ under the License.
<excludes>
<exclude>**/TestData.java</exclude>
</excludes>
<forkMode>once</forkMode>
<argLine>-Xms512m -Xmx1024m</argLine>
<systemPropertyVariables>
<log.level>WARN</log.level>
</systemPropertyVariables>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class DataSinkTaskTest extends TaskTestBase

private static final int NETWORK_BUFFER_SIZE = 1024;

private final String tempTestPath = Path.constructTestPath("dst_test");
private final String tempTestPath = Path.constructTestPath(DataSinkTaskTest.class, "dst_test");

@After
public void cleanUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class DataSourceTaskTest extends TaskTestBase {

private List<Record> outList;

private String tempTestPath = Path.constructTestPath("dst_test");
private String tempTestPath = Path.constructTestPath(DataSourceTaskTest.class, "dst_test");

@After
public void cleanUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public String createTempFile(String fileName, String contents) throws IOExceptio

public File createAndRegisterTempFile(String fileName) throws IOException {
File baseDir = new File(System.getProperty("java.io.tmpdir"));
File f = new File(baseDir, fileName);
File f = new File(baseDir, this.getClass().getName() + "-" + fileName);

if (f.exists()) {
deleteRecursively(f);
Expand Down
5 changes: 0 additions & 5 deletions flink-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ under the License.
<excludes>
<exclude>**/*TestBase*.class</exclude>
</excludes>
<forkMode>once</forkMode>
<argLine>-Xmx1024m</argLine>
</configuration>
</plugin>
<plugin>
Expand All @@ -126,9 +124,6 @@ under the License.
<systemPropertyVariables>
<log.level>WARN</log.level>
</systemPropertyVariables>
<forkMode>always</forkMode>
<threadCount>1</threadCount>
<perCoreThreadCount>false</perCoreThreadCount>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception {
String pageWithRankInputPath = ""; //"file:https://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = ""; //"file:https://" + PlayConstants.PLAY_DIR +
// "test-inputs/danglingpagerank/adjacencylists";
String outputPath = Path.constructTestURI("flink_iterations");
String outputPath = Path.constructTestURI(CustomCompensatableDanglingPageRank.class, "flink_iterations");
// String confPath = PlayConstants.PLAY_DIR + "local-conf";
int minorConsumer = 2;
int matchMemory = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception {
String pageWithRankInputPath = ""; //"file:https://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = ""; //"file:https://" + PlayConstants.PLAY_DIR +
// "test-inputs/danglingpagerank/adjacencylists";
String outputPath = Path.constructTestURI("flink_iterations");
String outputPath = Path.constructTestURI(CustomCompensatableDanglingPageRankWithCombiner.class, "flink_iterations");
int minorConsumer = 2;
int matchMemory = 5;
int coGroupSortMemory = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ public class PackagedProgramEndToEndITCase {

@Test
public void testEverything() {
final int PORT = 6498;

NepheleMiniCluster cluster = new NepheleMiniCluster();

File points = null;
File clusters = null;
File outFile = null;
Expand All @@ -68,10 +66,9 @@ public void testEverything() {
// run KMeans
cluster.setNumTaskTracker(2);
cluster.setTaskManagerNumSlots(2);
cluster.setJobManagerRpcPort(PORT);
cluster.start();
RemoteExecutor ex = new RemoteExecutor("localhost", PORT);

RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRpcPort());

ex.executeJar(jarPath,
"org.apache.flink.test.util.testjar.KMeansForTest",
Expand Down
34 changes: 24 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ under the License.
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<hadoop-one.version>1.2.1</hadoop-one.version>
<hadoop-two.version>2.2.0</hadoop-two.version>
<!-- Need to use a user property here because the surefire/failsafe
forkCount is not exposed as a property. With this we can set
it on the "mvn" commandline in travis. -->
<flink.forkCount>1.5C</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>
</properties>

<dependencies>
Expand Down Expand Up @@ -311,9 +316,9 @@ under the License.
<activeByDefault>false</activeByDefault>
<jdk>1.8</jdk>
</activation>
<modules>
<module>flink-java8-tests</module>
</modules>
<modules>
<module>flink-java8-tests</module>
</modules>
<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -395,10 +400,7 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
<version>2.7</version>
<configuration>
<argLine>-Xmx1024m</argLine>
</configuration>
<version>2.17</version>
</plugin>

<!-- test coverage reports -->
Expand Down Expand Up @@ -544,15 +546,27 @@ under the License.
</execution>
</executions>
<configuration>
<argLine>-Xmx1024m</argLine>
<forkCount>${flink.forkCount}</forkCount>
<reuseForks>${flink.reuseForks}</reuseForks>
<systemPropertyVariables>
<!-- we need the "0" in front here because for some reason
the property is null when we have just the variable-->
<forkNumber>0${surefire.forkNumber}</forkNumber>
</systemPropertyVariables>
<argLine>-Xmx800m</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.16</version><!--$NO-MVN-MAN-VER$-->
<version>2.17</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<argLine>-Xmx1024m</argLine>
<forkCount>${flink.forkCount}</forkCount>
<reuseForks>${flink.reuseForks}</reuseForks>
<systemPropertyVariables>
<forkNumber>0${surefire.forkNumber}</forkNumber>
</systemPropertyVariables>
<argLine>-Xmx800m</argLine>
</configuration>
</plugin>
<plugin>
Expand Down

0 comments on commit b2af293

Please sign in to comment.