Skip to content

Commit

Permalink
[tests] Cleanup sysout logging in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Apr 13, 2015
1 parent e79813b commit 69a400f
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -37,17 +39,19 @@ public class DegreesWithExceptionITCase {
private static final int PARALLELISM = 4;

private static ForkableFlinkMiniCluster cluster;

@BeforeClass
public static void suppressOutput() {
TestGraphUtils.pipeSystemOutToNull();
}


@BeforeClass
public static void setupCluster() {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
cluster = new ForkableFlinkMiniCluster(config, false);
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
cluster = new ForkableFlinkMiniCluster(config, false);
}
catch (Exception e) {
e.printStackTrace();
fail("Error starting test cluster: " + e.getMessage());
}
}

@AfterClass
Expand All @@ -56,7 +60,6 @@ public static void tearDownCluster() {
cluster.stop();
}
catch (Throwable t) {
System.err.println("Error stopping cluster on shutdown");
t.printStackTrace();
fail("Cluster shutdown caused an exception: " + t.getMessage());
}
Expand All @@ -70,9 +73,9 @@ public void testOutDegreesInvalidEdgeSrcId() throws Exception {

final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());

env.setParallelism(PARALLELISM);

env.getConfig().disableSysoutLogging();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);

Expand All @@ -86,15 +89,16 @@ public void testOutDegreesInvalidEdgeSrcId() throws Exception {
}
}

/**
* Test inDegrees() with an edge having a trgId that does not exist in the vertex DataSet
*/
@Test
public void testInDegreesInvalidEdgeTrgId() throws Exception {
/*
* Test inDegrees() with an edge having a trgId that does not exist in the vertex DataSet
*/

final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());

env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
Expand All @@ -109,15 +113,16 @@ public void testInDegreesInvalidEdgeTrgId() throws Exception {
}
}

/**
* Test getDegrees() with an edge having a trgId that does not exist in the vertex DataSet
*/
@Test
public void testGetDegreesInvalidEdgeTrgId() throws Exception {
/*
* Test getDegrees() with an edge having a trgId that does not exist in the vertex DataSet
*/

final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());

env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
Expand All @@ -132,15 +137,16 @@ public void testGetDegreesInvalidEdgeTrgId() throws Exception {
}
}

/**
* Test getDegrees() with an edge having a srcId that does not exist in the vertex DataSet
*/
@Test
public void testGetDegreesInvalidEdgeSrcId() throws Exception {
/*
* Test getDegrees() with an edge having a srcId that does not exist in the vertex DataSet
*/

final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());

env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
Expand All @@ -155,15 +161,16 @@ public void testGetDegreesInvalidEdgeSrcId() throws Exception {
}
}

/**
* Test getDegrees() with an edge having a srcId and a trgId that does not exist in the vertex DataSet
*/
@Test
public void testGetDegreesInvalidEdgeSrcTrgId() throws Exception {
/*
* Test getDegrees() with an edge having a srcId and a trgId that does not exist in the vertex DataSet
*/

final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getJobManagerRPCPort());

env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();

Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);
Expand All @@ -173,7 +180,8 @@ public void testGetDegreesInvalidEdgeSrcTrgId() throws Exception {
env.execute();

fail("graph.getDegrees() did not fail.");
} catch (Exception e) {
}
catch (Exception e) {
// We expect the job to fail with an exception
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void testProgramWithAutoParallelism() {
"localhost", cluster.getJobManagerRPCPort());

env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
env.getConfig().disableSysoutLogging();

DataSet<Integer> result = env
.createInput(new ParallelismDependentInputFormat())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void testNullValues() {
ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());

env.setParallelism(1);
env.getConfig().disableSysoutLogging();

DataSet<String> data = env.fromElements("hallo")
.map(new MapFunction<String, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void testFailedRunThenSuccessfulRun() {

env.setParallelism(4);
env.setNumberOfExecutionRetries(0);
env.getConfig().disableSysoutLogging();

env.generateSequence(1, 10)
.rebalance()
Expand Down Expand Up @@ -110,6 +111,7 @@ public Long reduce(Long value1, Long value2) {

env.setParallelism(4);
env.setNumberOfExecutionRetries(0);
env.getConfig().disableSysoutLogging();

env.generateSequence(1, 10)
.rebalance()
Expand Down Expand Up @@ -156,6 +158,7 @@ public void testRestart() {

env.setParallelism(4);
env.setNumberOfExecutionRetries(1);
env.getConfig().disableSysoutLogging();

env.generateSequence(1, 10)
.rebalance()
Expand Down Expand Up @@ -200,6 +203,7 @@ public void testRestartMultipleTimes() {

env.setParallelism(4);
env.setNumberOfExecutionRetries(5);
env.getConfig().disableSysoutLogging();

env.generateSequence(1, 10)
.rebalance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void testRestartWithFailingTaskManager() {

env.setParallelism(PARALLELISM);
env.setNumberOfExecutionRetries(1);
env.getConfig().disableSysoutLogging();

env.generateSequence(1, 10)
.map(new FailingMapper<Long>())
Expand Down

0 comments on commit 69a400f

Please sign in to comment.