Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12154][network] Remove legacy fields for SingleInputGate #8136

Merged
merged 1 commit into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
Expand Down Expand Up @@ -193,7 +192,6 @@ public SingleInputGate(
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
TaskIOMetricGroup metrics,
boolean isCreditBased) {

this.owningTaskName = checkNotNull(owningTaskName);
Expand Down Expand Up @@ -664,7 +662,6 @@ Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
public static SingleInputGate create(
String owningTaskName,
JobID jobId,
ExecutionAttemptID executionId,
InputGateDeploymentDescriptor igdd,
NetworkEnvironment networkEnvironment,
TaskEventPublisher taskEventPublisher,
Expand All @@ -683,7 +680,7 @@ public static SingleInputGate create(

final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
icdd.length, taskActions, metrics, networkConfig.isCreditBased());
icdd.length, taskActions, networkConfig.isCreditBased());

// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ public Task(
SingleInputGate gate = SingleInputGate.create(
taskNameWithSubtaskAndId,
jobId,
executionId,
inputGateDeploymentDescriptor,
networkEnvironment,
taskEventDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
Expand Down Expand Up @@ -300,23 +300,13 @@ private static ResultPartition createResultPartition(
*
* @param partitionType
* the consumed partition type
* @param channels
* @param numberOfChannels
* the number of input channels
*
* @return input gate with some fake settings
*/
private SingleInputGate createSingleInputGate(
final ResultPartitionType partitionType, final int channels) {
return spy(new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(),
partitionType,
0,
channels,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
enableCreditBasedFlowControl));
private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) {
return spy(InputChannelTestUtils.createSingleInputGate(numberOfChannels, partitionType, enableCreditBasedFlowControl));
}

private static void createRemoteInputChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@

import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createBufferResponse;
import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createRemoteInputChannel;
import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createSingleInputGate;
import static org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testReceiveEmptyBuffer() throws Exception {
@Test
public void testReceiveBuffer() throws Exception {
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testReceiveBuffer() throws Exception {
*/
@Test
public void testThrowExceptionForNoAvailableBuffer() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate));

final CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
Expand Down Expand Up @@ -246,7 +246,7 @@ public void testNotifyCreditAvailable() throws Exception {
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel1 = createRemoteInputChannel(inputGate, client);
final RemoteInputChannel inputChannel2 = createRemoteInputChannel(inputGate, client);
try {
Expand Down Expand Up @@ -347,7 +347,7 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception {
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
Expand All @@ -30,14 +29,11 @@
import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
Expand All @@ -48,6 +44,7 @@

import java.io.IOException;

import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -131,7 +128,7 @@ public void testReceiveEmptyBuffer() throws Exception {
@Test
public void testReceiveBuffer() throws Exception {
final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8);
Expand Down Expand Up @@ -207,24 +204,6 @@ public void testCancelBeforeActive() throws Exception {

// ---------------------------------------------------------------------------------------------

/**
* Creates and returns the single input gate for credit-based testing.
*
* @return The new created single input gate.
*/
static SingleInputGate createSingleInputGate() {
return new SingleInputGate(
"InputGate",
new JobID(),
new IntermediateDataSetID(),
ResultPartitionType.PIPELINED,
0,
1,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
}

/**
* Creates and returns a remote input channel for the specific input gate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.junit.Test;

import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createRemoteInputChannel;
import static org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createSingleInputGate;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
Expand All @@ -53,7 +53,7 @@ public void testRetriggerPartitionRequest() throws Exception {
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client, 1, 2);

try {
Expand Down Expand Up @@ -107,7 +107,7 @@ public void testDoublePartitionRequest() throws Exception {
channel, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));

final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
final SingleInputGate inputGate = createSingleInputGate();
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, client);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;

import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -60,6 +64,26 @@ public ResultSubpartitionView answer(InvocationOnMock invocation) throws Throwab
return manager;
}

public static SingleInputGate createSingleInputGate(int numberOfChannels) {
return createSingleInputGate(numberOfChannels, ResultPartitionType.PIPELINED, true);
}

public static SingleInputGate createSingleInputGate(
int numberOfChannels,
ResultPartitionType partitionType,
boolean isCreditBased) {

return new SingleInputGate(
"InputGate",
new JobID(),
new IntermediateDataSetID(),
partitionType,
0,
numberOfChannels,
new NoOpTaskActions(),
isCreditBased);
}

public static ConnectionManager createDummyConnectionManager() throws Exception {
final PartitionRequestClient mockClient = mock(PartitionRequestClient.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
Expand All @@ -29,10 +28,8 @@
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;

import org.junit.Test;

Expand All @@ -43,6 +40,7 @@

import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
Expand All @@ -64,14 +62,7 @@ public void testConsumptionWithLocalChannels() throws Exception {

final ResultPartitionManager resultPartitionManager = createResultPartitionManager(partitions);

final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, numberOfChannels,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
final SingleInputGate gate = createSingleInputGate(numberOfChannels);

for (int i = 0; i < numberOfChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
Expand Down Expand Up @@ -100,15 +91,7 @@ public void testConsumptionWithRemoteChannels() throws Exception {
final ConnectionManager connManager = createDummyConnectionManager();
final Source[] sources = new Source[numberOfChannels];

final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0,
numberOfChannels,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
final SingleInputGate gate = createSingleInputGate(numberOfChannels);

for (int i = 0; i < numberOfChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
Expand Down Expand Up @@ -150,15 +133,7 @@ public void testConsumptionWithMixedChannels() throws Exception {

final Source[] sources = new Source[numberOfChannels];

final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0,
numberOfChannels,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
true);
final SingleInputGate gate = createSingleInputGate(numberOfChannels);

for (int i = 0, local = 0; i < numberOfChannels; i++) {
if (localOrRemote.get(i)) {
Expand Down
Loading