Skip to content

Commit

Permalink
[FLINK-12154][network] Remove legacy fields for SingleInputGate (apac…
Browse files Browse the repository at this point in the history
…he#8136)

This work is a preparation for FLINK-11726.

In SingleInputGate#create, we could remove unused parameter ExecutionAttemptID.
And for the constructor of SingleInputGate, we could remove unused parameter TaskIOMetricGroup.
Then we introduce createSingleInputGate for reusing the process of creating SingleInputGate in related tests.
  • Loading branch information
zhijiangW authored and pnowojski committed Apr 15, 2019
1 parent 807bce2 commit c7ef6db
Show file tree
Hide file tree
Showing 15 changed files with 87 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
Expand Down Expand Up @@ -193,7 +192,6 @@ public SingleInputGate(
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
TaskIOMetricGroup metrics,
boolean isCreditBased) {

this.owningTaskName = checkNotNull(owningTaskName);
Expand Down Expand Up @@ -664,7 +662,6 @@ Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
public static SingleInputGate create(
String owningTaskName,
JobID jobId,
ExecutionAttemptID executionId,
InputGateDeploymentDescriptor igdd,
NetworkEnvironment networkEnvironment,
TaskEventPublisher taskEventPublisher,
Expand All @@ -683,7 +680,7 @@ public static SingleInputGate create(

final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
icdd.length, taskActions, metrics, networkConfig.isCreditBased());
icdd.length, taskActions, networkConfig.isCreditBased());

// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ public Task(
SingleInputGate gate = SingleInputGate.create(
taskNameWithSubtaskAndId,
jobId,
executionId,
inputGateDeploymentDescriptor,
networkEnvironment,
taskEventDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
Expand Down Expand Up @@ -300,23 +300,13 @@ private static ResultPartition createResultPartition(
*
* @param partitionType
* the consumed partition type
* @param channels
* @param numberOfChannels
* the number of input channels
*
* @return input gate with some fake settings
*/
private SingleInputGate createSingleInputGate(
final ResultPartitionType partitionType, final int channels) {
return spy(new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(),
partitionType,
0,
channels,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
enableCreditBasedFlowControl));
private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) {
return spy(InputChannelTestUtils.createSingleInputGate(numberOfChannels, partitionType, enableCreditBasedFlowControl));
}

private static void createRemoteInputChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@

import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createBufferResponse;
import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createRemoteInputChannel;
import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createSingleInputGate;
import static org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testReceiveEmptyBuffer() throws Exception {
@Test
public void testReceiveBuffer() throws Exception {
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testReceiveBuffer() throws Exception {
*/
@Test
public void testThrowExceptionForNoAvailableBuffer() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate));

final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
Expand Down Expand Up @@ -246,7 +246,7 @@ public void testNotifyCreditAvailable() throws Exception {
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel1 = createRemoteInputChannel(inputGate, client);
final RemoteInputChannel inputChannel2 = createRemoteInputChannel(inputGate, client);
try {
Expand Down Expand Up @@ -347,7 +347,7 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception {
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
Expand All @@ -30,14 +29,11 @@
import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
Expand All @@ -48,6 +44,7 @@

import java.io.IOException;

import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -131,7 +128,7 @@ public void testReceiveEmptyBuffer() throws Exception {
@Test
public void testReceiveBuffer() throws Exception {
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
Expand Down Expand Up @@ -207,24 +204,6 @@ public void testCancelBeforeActive() throws Exception {

// ---------------------------------------------------------------------------------------------

/**
* Creates and returns the single input gate for credit-based testing.
*
* @return The new created single input gate.
*/
static SingleInputGate createSingleInputGate() {
return new SingleInputGate(
"InputGate",
new JobID(),
new IntermediateDataSetID(),
ResultPartitionType.PIPELINED,
0,
1,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
}

/**
* Creates and returns a remote input channel for the specific input gate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.junit.Test;

import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createRemoteInputChannel;
import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createSingleInputGate;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
Expand All @@ -53,7 +53,7 @@ public void testRetriggerPartitionRequest() throws Exception {
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client, 1, 2);

try {
Expand Down Expand Up @@ -107,7 +107,7 @@ public void testDoublePartitionRequest() throws Exception {
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;

import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -60,6 +64,26 @@ public ResultSubpartitionView answer(InvocationOnMock invocation) throws Throwab
return manager;
}

public static SingleInputGate createSingleInputGate(int numberOfChannels) {
return createSingleInputGate(numberOfChannels, ResultPartitionType.PIPELINED, true);
}

public static SingleInputGate createSingleInputGate(
int numberOfChannels,
ResultPartitionType partitionType,
boolean isCreditBased) {

return new SingleInputGate(
"InputGate",
new JobID(),
new IntermediateDataSetID(),
partitionType,
0,
numberOfChannels,
new NoOpTaskActions(),
isCreditBased);
}

public static ConnectionManager createDummyConnectionManager() throws Exception {
final PartitionRequestClient mockClient = mock(PartitionRequestClient.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
Expand All @@ -29,10 +28,8 @@
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;

import org.junit.Test;

Expand All @@ -43,6 +40,7 @@

import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
Expand All @@ -64,14 +62,7 @@ public void testConsumptionWithLocalChannels() throws Exception {

final ResultPartitionManager resultPartitionManager = createResultPartitionManager(partitions);

final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, numberOfChannels,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
final SingleInputGate gate = createSingleInputGate(numberOfChannels);

for (int i = 0; i < numberOfChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
Expand Down Expand Up @@ -100,15 +91,7 @@ public void testConsumptionWithRemoteChannels() throws Exception {
final ConnectionManager connManager = createDummyConnectionManager();
final Source[] sources = new Source[numberOfChannels];

final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0,
numberOfChannels,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
final SingleInputGate gate = createSingleInputGate(numberOfChannels);

for (int i = 0; i < numberOfChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
Expand Down Expand Up @@ -150,15 +133,7 @@ public void testConsumptionWithMixedChannels() throws Exception {

final Source[] sources = new Source[numberOfChannels];

final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0,
numberOfChannels,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
final SingleInputGate gate = createSingleInputGate(numberOfChannels);

for (int i = 0, local = 0; i < numberOfChannels; i++) {
if (localOrRemote.get(i)) {
Expand Down
Loading

0 comments on commit c7ef6db

Please sign in to comment.