Skip to content

Commit

Permalink
[hotfix] [tests] Speed up StreamCheckpointNotifierITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jan 27, 2017
1 parent 4fbc631 commit fc597f6
Showing 1 changed file with 13 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -35,12 +34,11 @@
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;

import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,6 +50,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -73,45 +72,11 @@
* successfully completed checkpoint.
*/
@SuppressWarnings("serial")
public class StreamCheckpointNotifierITCase extends TestLogger {
public class StreamCheckpointNotifierITCase extends StreamingMultipleProgramsTestBase {

private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);

private static final int NUM_TASK_MANAGERS = 2;
private static final int NUM_TASK_SLOTS = 3;
private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;

private static LocalFlinkMiniCluster cluster;

@BeforeClass
public static void startCluster() {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms");
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);

cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to start test cluster: " + e.getMessage());
}
}

@AfterClass
public static void stopCluster() {
try {
cluster.stop();
cluster = null;
}
catch (Exception e) {
e.printStackTrace();
fail("Failed to stop test cluster: " + e.getMessage());
}
}
private static final int PARALLELISM = 4;

/**
* Runs the following program:
Expand All @@ -123,18 +88,17 @@ public static void stopCluster() {
@Test
public void testProgram() {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());

env.setParallelism(PARALLELISM);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
assertEquals("test setup broken", PARALLELISM, env.getParallelism());

env.enableCheckpointing(500);
env.getConfig().disableSysoutLogging();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L));

final int numElements = 10000;
final int numTaskTotal = PARALLELISM * 5;

DataStream<Long> stream = env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal));

stream
// -------------- first vertex, chained to the src ----------------
.filter(new LongRichFilterFunction())
Expand Down

0 comments on commit fc597f6

Please sign in to comment.