Skip to content

Commit

Permalink
[FLINK-12067][network] Refactor the constructor of NetworkEnvironment (
Browse files Browse the repository at this point in the history
…apache#8090)

[FLINK-12067][network] Refactor the constructor of NetworkEnvironment

The constructor of NetworkEnvironment was refactored to only contain NetworkEnvironmentConfiguration. The other related components such as TaskEventDispatcher, ResultPartitionManager, NetworkBufferPool are created internally.

We also refactor the process of generating NetworkEnvironmentConfiguration in TaskManagerServiceConfiguration to add numNetworkBuffers instead of previous networkBufFraction, networkBufMin, networkBufMax. This way seems more easy and direct to construct NetworkBufferPool later. isCreditBased field is also maintained in this component for considering the setting usage in tests.

Further we introduce the NetworkEnvironmentConfigurationBuilder for creating NetworkEnvironmentConfiguration easily especially for tests.
  • Loading branch information
zhijiangW authored and pnowojski committed Apr 12, 2019
1 parent 76841be commit 352a995
Show file tree
Hide file tree
Showing 25 changed files with 951 additions and 908 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -69,8 +70,8 @@ public void checkOperatingSystem() {
}

/**
* Tests that {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} has the same
* result as the shell script.
* Tests that {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)}
* has the same result as the shell script.
*/
@Test
public void compareNetworkBufShellScriptWithJava() throws Exception {
Expand Down Expand Up @@ -200,7 +201,7 @@ private static Configuration getRandomConfig(final Random ran) {
Configuration config = getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
long totalJavaMemorySize = ((long) javaMemMB) << 20; // megabytes to bytes
final int networkBufMB =
(int) (TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
(int) (NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
// max (exclusive): total - netbuf
managedMemSize = Math.min(javaMemMB - networkBufMB - 1, ran.nextInt(Integer.MAX_VALUE));
} else {
Expand All @@ -226,7 +227,8 @@ private void compareNetworkBufJavaVsScript(final Configuration config, final flo

final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);

long javaNetworkBufMem = TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySizeMB << 20, config);
long javaNetworkBufMem = NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
totalJavaMemorySizeMB << 20, config);

String[] command = {"src/test/bin/calcTMNetBufMem.sh",
totalJavaMemorySizeMB + "m",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
Expand All @@ -48,6 +51,8 @@ public class NetworkEnvironment {

private final Object lock = new Object();

private final NetworkEnvironmentConfiguration config;

private final NetworkBufferPool networkBufferPool;

private final ConnectionManager connectionManager;
Expand All @@ -56,64 +61,25 @@ public class NetworkEnvironment {

private final TaskEventPublisher taskEventPublisher;

private final int partitionRequestInitialBackoff;

private final int partitionRequestMaxBackoff;

/** Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). */
private final int networkBuffersPerChannel;
private boolean isShutdown;

/** Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */
private final int extraNetworkBuffersPerGate;
public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) {
this.config = checkNotNull(config);

private final boolean enableCreditBased;
this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());

private boolean isShutdown;
NettyConfig nettyConfig = config.nettyConfig();
if (nettyConfig != null) {
this.connectionManager = new NettyConnectionManager(nettyConfig, config.isCreditBased());
} else {
this.connectionManager = new LocalConnectionManager();
}

public NetworkEnvironment(
int numBuffers,
int memorySegmentSize,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int extraNetworkBuffersPerGate,
boolean enableCreditBased) {
this(
new NetworkBufferPool(numBuffers, memorySegmentSize),
new LocalConnectionManager(),
new ResultPartitionManager(),
new TaskEventDispatcher(),
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
networkBuffersPerChannel,
extraNetworkBuffersPerGate,
enableCreditBased);
}
this.resultPartitionManager = new ResultPartitionManager();

public NetworkEnvironment(
NetworkBufferPool networkBufferPool,
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
TaskEventPublisher taskEventPublisher,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int extraNetworkBuffersPerGate,
boolean enableCreditBased) {

this.networkBufferPool = checkNotNull(networkBufferPool);
this.connectionManager = checkNotNull(connectionManager);
this.resultPartitionManager = checkNotNull(resultPartitionManager);
this.taskEventPublisher = checkNotNull(taskEventPublisher);

this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;

isShutdown = false;
this.networkBuffersPerChannel = networkBuffersPerChannel;
this.extraNetworkBuffersPerGate = extraNetworkBuffersPerGate;

this.enableCreditBased = enableCreditBased;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -132,16 +98,8 @@ public NetworkBufferPool getNetworkBufferPool() {
return networkBufferPool;
}

public int getPartitionRequestInitialBackoff() {
return partitionRequestInitialBackoff;
}

public int getPartitionRequestMaxBackoff() {
return partitionRequestMaxBackoff;
}

public boolean isCreditBased() {
return enableCreditBased;
public NetworkEnvironmentConfiguration getConfiguration() {
return config;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -174,8 +132,8 @@ public void setupPartition(ResultPartition partition) throws IOException {

try {
int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ?
partition.getNumberOfSubpartitions() * networkBuffersPerChannel +
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
partition.getNumberOfSubpartitions() * config.networkBuffersPerChannel() +
config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;
// If the partition type is back pressure-free, we register with the buffer pool for
// callbacks to release memory.
bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
Expand Down Expand Up @@ -203,17 +161,17 @@ public void setupInputGate(SingleInputGate gate) throws IOException {
BufferPool bufferPool = null;
int maxNumberOfMemorySegments;
try {
if (enableCreditBased) {
if (config.isCreditBased()) {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;

// assign exclusive buffers to input channels directly and use the rest for floating buffers
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
gate.assignExclusiveSegments(networkBufferPool, config.networkBuffersPerChannel());
bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
} else {
maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
gate.getNumberOfInputChannels() * config.networkBuffersPerChannel() +
config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE;

bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
maxNumberOfMemorySegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,6 @@ public boolean getSSLEnabled() {
&& SSLUtils.isInternalSSLEnabled(config);
}

public boolean isCreditBasedEnabled() {
return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
}

public Configuration getConfig() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,21 @@ public class NettyConnectionManager implements ConnectionManager {

private final PartitionRequestClientFactory partitionRequestClientFactory;

public NettyConnectionManager(NettyConfig nettyConfig) {
private final boolean isCreditBased;

public NettyConnectionManager(NettyConfig nettyConfig, boolean isCreditBased) {
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);

this.isCreditBased = isCreditBased;
}

@Override
public void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventPublisher) throws IOException {
NettyProtocol partitionRequestProtocol = new NettyProtocol(
partitionProvider,
taskEventPublisher,
client.getConfig().isCreditBasedEnabled());
NettyProtocol partitionRequestProtocol = new NettyProtocol(partitionProvider, taskEventPublisher, isCreditBased);

client.init(partitionRequestProtocol, bufferPool);
server.init(partitionRequestProtocol, bufferPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.TaskActions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -678,9 +679,11 @@ public static SingleInputGate create(

final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());

final NetworkEnvironmentConfiguration networkConfig = networkEnvironment.getConfiguration();

final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
icdd.length, taskActions, metrics, networkEnvironment.isCreditBased());
icdd.length, taskActions, metrics, networkConfig.isCreditBased());

// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
Expand All @@ -697,8 +700,8 @@ public static SingleInputGate create(
inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId,
networkEnvironment.getResultPartitionManager(),
taskEventPublisher,
networkEnvironment.getPartitionRequestInitialBackoff(),
networkEnvironment.getPartitionRequestMaxBackoff(),
networkConfig.partitionRequestInitialBackoff(),
networkConfig.partitionRequestMaxBackoff(),
metrics
);

Expand All @@ -708,8 +711,8 @@ else if (partitionLocation.isRemote()) {
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
partitionLocation.getConnectionId(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialBackoff(),
networkEnvironment.getPartitionRequestMaxBackoff(),
networkConfig.partitionRequestInitialBackoff(),
networkConfig.partitionRequestMaxBackoff(),
metrics
);

Expand All @@ -720,8 +723,8 @@ else if (partitionLocation.isUnknown()) {
networkEnvironment.getResultPartitionManager(),
taskEventPublisher,
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialBackoff(),
networkEnvironment.getPartitionRequestMaxBackoff(),
networkConfig.partitionRequestInitialBackoff(),
networkConfig.partitionRequestMaxBackoff(),
metrics
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.util.NetUtils;

Expand Down Expand Up @@ -137,4 +138,32 @@ public static QueryableStateConfiguration disabled() {
final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.SERVER_PORT_RANGE.defaultValue());
return new QueryableStateConfiguration(proxyPorts, serverPorts, 0, 0, 0, 0);
}

/**
* Creates the {@link QueryableStateConfiguration} from the given Configuration.
*/
public static QueryableStateConfiguration fromConfiguration(Configuration config) {
if (!config.getBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER)) {
return null;
}

final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(
config.getString(QueryableStateOptions.PROXY_PORT_RANGE));
final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(
config.getString(QueryableStateOptions.SERVER_PORT_RANGE));

final int numProxyServerNetworkThreads = config.getInteger(QueryableStateOptions.PROXY_NETWORK_THREADS);
final int numProxyServerQueryThreads = config.getInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS);

final int numStateServerNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
final int numStateServerQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);

return new QueryableStateConfiguration(
proxyPorts,
serverPorts,
numProxyServerNetworkThreads,
numProxyServerQueryThreads,
numStateServerNetworkThreads,
numStateServerQueryThreads);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ public static TaskExecutor startTaskManager(
TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration.fromConfiguration(
configuration,
EnvironmentInformation.getMaxJvmHeapMemory(),
remoteAddress,
localCommunicationOnly);

Expand Down
Loading

0 comments on commit 352a995

Please sign in to comment.