Skip to content

Commit

Permalink
[FLINK-12800] Harden Tests when availableProcessors is 1
Browse files Browse the repository at this point in the history
This closes apache#8716.
  • Loading branch information
leesf authored and tillrohrmann committed Jun 14, 2019
1 parent c539333 commit 3fa062a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ private void verifyBranchingJoiningPlan(ExecutionMode execMode,
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setExecutionMode(execMode);

// set parallelism to 2 to avoid chaining with source in case when available processors is 1.
env.setParallelism(2);

DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L)
.map(new MapFunction<Long, Tuple2<Long, Long>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public void testDisabledCheckpointing() throws Exception {
@Test
public void testChainStartEndSetting() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// set parallelism to 2 to avoid chaining with source in case when available processors is 1.
env.setParallelism(2);

// fromElements -> CHAIN(Map -> Print)
env.fromElements(1, 2, 3)
.map(new MapFunction<Integer, Integer>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ public void testOperatorChainWithProcessingTime() throws Exception {
private JobVertex createChainedVertex(boolean withLazyFunction) {
StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.getExecutionEnvironment();

// set parallelism to 2 to avoid chaining with source in case when available processors is 1.
chainEnv.setParallelism(2);

// the input is only used to construct a chained operator, and they will not be used in the real tests.
DataStream<Integer> input = chainEnv.fromElements(1, 2, 3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public void testMultiChainingWithoutObjectReuse() throws Exception {
*/
private void testMultiChaining(StreamExecutionEnvironment env) throws Exception {

// set parallelism to 2 to avoid chaining with source in case when available processors is 1.
env.setParallelism(2);

// the actual elements will not be used
DataStream<Integer> input = env.fromElements(1, 2, 3);

Expand Down Expand Up @@ -175,6 +178,9 @@ public void testMultiChainingWithSplitWithoutObjectReuse() throws Exception {
*/
private void testMultiChainingWithSplit(StreamExecutionEnvironment env) throws Exception {

// set parallelism to 2 to avoid chaining with source in case when available processors is 1.
env.setParallelism(2);

// the actual elements will not be used
DataStream<Integer> input = env.fromElements(1, 2, 3);

Expand Down

0 comments on commit 3fa062a

Please sign in to comment.