Skip to content

Commit

Permalink
[hotfix][test] Introduce InputGateTestBase and deduplicate test code
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed May 10, 2019
1 parent 5b84dbf commit 07ae430
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.runtime.io.network.partition.ResultPartitionType;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

import java.util.Arrays;
import java.util.List;

import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.junit.Assert.assertEquals;

/**
* Test base for {@link InputGate}.
*/
@RunWith(Parameterized.class)
public abstract class InputGateTestBase {

@Parameter
public boolean enableCreditBasedFlowControl;

@Parameters(name = "Credit-based = {0}")
public static List<Boolean> parameters() {
return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
}

protected SingleInputGate createInputGate() {
return createInputGate(2);
}

protected SingleInputGate createInputGate(int numberOfInputChannels) {
return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED);
}

protected SingleInputGate createInputGate(
int numberOfInputChannels, ResultPartitionType partitionType) {
SingleInputGate inputGate = createSingleInputGate(
numberOfInputChannels,
partitionType,
enableCreditBasedFlowControl);

assertEquals(partitionType, inputGate.getConsumedPartitionType());
return inputGate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,13 @@
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand All @@ -80,17 +75,7 @@
/**
* Tests for {@link SingleInputGate}.
*/
@RunWith(Parameterized.class)
public class SingleInputGateTest {

@Parameterized.Parameter
public boolean enableCreditBasedFlowControl;

@Parameterized.Parameters(name = "Credit-based = {0}")
public static List<Boolean> parameters() {
return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
}

public class SingleInputGateTest extends InputGateTestBase {
/**
* Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
* value after receiving all end-of-partition events.
Expand Down Expand Up @@ -546,22 +531,6 @@ public void testUpdateUnknownInputChannel() throws Exception {

// ---------------------------------------------------------------------------------------------

private SingleInputGate createInputGate() {
return createInputGate(2);
}

private SingleInputGate createInputGate(int numberOfInputChannels) {
return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED);
}

private SingleInputGate createInputGate(int numberOfInputChannels, ResultPartitionType partitionType) {
SingleInputGate inputGate = createSingleInputGate(numberOfInputChannels, partitionType, enableCreditBasedFlowControl);

assertEquals(partitionType, inputGate.getConsumedPartitionType());

return inputGate;
}

private void addUnknownInputChannel(
NetworkEnvironment network,
SingleInputGate inputGate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Tests for {@link UnionInputGate}.
*/
public class UnionInputGateTest {
public class UnionInputGateTest extends InputGateTestBase {

/**
* Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
Expand All @@ -41,8 +41,8 @@ public class UnionInputGateTest {
@Test(timeout = 120 * 1000)
public void testBasicGetNextLogic() throws Exception {
// Setup
final SingleInputGate ig1 = createSingleInputGate(3);
final SingleInputGate ig2 = createSingleInputGate(5);
final SingleInputGate ig1 = createInputGate(3);
final SingleInputGate ig2 = createInputGate(5);

final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});

Expand Down

0 comments on commit 07ae430

Please sign in to comment.