Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12067][network] Refactor the constructor of NetworkEnvironment #8090

Merged
merged 9 commits into from
Apr 12, 2019
Prev Previous commit
Next Next commit
[fixup] Rebase master to solve conflicts
  • Loading branch information
zhijiangW committed Apr 11, 2019
commit d0f53531b1403292e9b9f9b14488d7a0e77694f9
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,10 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
Expand All @@ -57,17 +52,22 @@
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;

import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

import javax.annotation.Nonnull;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -231,59 +231,33 @@ private static JobManagerConnection createJobManagerConnection(JobID jobId, JobM
partitionProducerStateChecker);
}

private static NetworkEnvironment createNetworkEnvironment(boolean localCommunication, Configuration configuration, RpcService testingRpcService, boolean mockNetworkEnvironment) throws Exception {
final ConnectionManager connectionManager;
if (!localCommunication) {
NettyConfig nettyConfig = TaskManagerServicesConfiguration
.fromConfiguration(configuration, InetAddress.getByName(testingRpcService.getAddress()), localCommunication).getNetworkConfig()
.nettyConfig();
connectionManager = new NettyConnectionManager(nettyConfig);
} else {
connectionManager = new LocalConnectionManager();
}

final int numAllBuffers = 10;
private static NetworkEnvironment createNetworkEnvironment(
boolean localCommunication,
Configuration configuration,
RpcService testingRpcService,
boolean mockNetworkEnvironment) throws Exception {
final NetworkEnvironment networkEnvironment;
if (mockNetworkEnvironment) {
networkEnvironment = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
} else {
networkEnvironment = createNetworkEnvironment(
numAllBuffers,
128,
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL),
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX),
2,
8,
true,
connectionManager);
final InetSocketAddress socketAddress = new InetSocketAddress(
InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(TaskManagerOptions.DATA_PORT));

final NettyConfig nettyConfig = new NettyConfig(socketAddress.getAddress(), socketAddress.getPort(),
NetworkEnvironmentConfiguration.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration);

networkEnvironment = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder()
.setPartitionRequestInitialBackoff(configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL))
.setPartitionRequestMaxBackoff(configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX))
.setNettyConfig(localCommunication ? null : nettyConfig)
.build(),
new TaskEventDispatcher());
networkEnvironment.start();
}

return networkEnvironment;
}

@Nonnull
private static NetworkEnvironment createNetworkEnvironment(
int numBuffers,
int memorySegmentSize,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int extraNetworkBuffersPerGate,
boolean enableCreditBased,
ConnectionManager connectionManager) {
return new NetworkEnvironment(
new NetworkBufferPool(numBuffers, memorySegmentSize),
connectionManager,
new ResultPartitionManager(),
new TaskEventDispatcher(),
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
networkBuffersPerChannel,
extraNetworkBuffersPerGate,
enableCreditBased);
}

@Override
public void close() throws Exception {
RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
Expand Down