Skip to content

Commit

Permalink
[FLINK-11392][network] Rename NetworkEnviroment to NettyShuffleEnviro…
Browse files Browse the repository at this point in the history
…ment

This closes apache#8608.
  • Loading branch information
azagrebin authored and tillrohrmann committed Jun 11, 2019
1 parent 6349db7 commit c104daf
Show file tree
Hide file tree
Showing 52 changed files with 613 additions and 610 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.java.BatchTableEnvironment;
Expand Down Expand Up @@ -362,7 +362,7 @@ private abstract static class LimitNetworkBuffersTestEnvironment extends Executi
public static void setAsContext() {
Configuration config = new Configuration();
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
final LocalEnvironment le = new LocalEnvironment(config);

initializeContextEnvironment(new ExecutionEnvironmentFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,15 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";

/**
* @deprecated use {@link NetworkEnvironmentOptions#DATA_PORT} instead
* @deprecated use {@link NettyShuffleEnvironmentOptions#DATA_PORT} instead
*/
@Deprecated
public static final String TASK_MANAGER_DATA_PORT_KEY = "taskmanager.data.port";

/**
* Config parameter to override SSL support for taskmanager's data transport.
*
* @deprecated use {@link NetworkEnvironmentOptions#DATA_SSL_ENABLED} instead
* @deprecated use {@link NettyShuffleEnvironmentOptions#DATA_SSL_ENABLED} instead
*/
@Deprecated
public static final String TASK_MANAGER_DATA_SSL_ENABLED = "taskmanager.data.ssl.enabled";
Expand Down Expand Up @@ -270,7 +270,7 @@ public final class ConfigConstants {
* The config parameter defining the number of buffers used in the network stack. This defines the
* number of possible tasks and shuffles.
*
* @deprecated Use {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} instead
* @deprecated Use {@link NettyShuffleEnvironmentOptions#NETWORK_NUM_BUFFERS} instead
*/
@Deprecated
public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
Expand Down Expand Up @@ -1392,15 +1392,15 @@ public final class ConfigConstants {
* The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
* the TaskManager searches for a free port.
*
* @deprecated use {@link NetworkEnvironmentOptions#DATA_PORT} instead
* @deprecated use {@link NettyShuffleEnvironmentOptions#DATA_PORT} instead
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 0;

/**
* The default value to override ssl support for task manager's data transport.
*
* @deprecated use {@link NetworkEnvironmentOptions#DATA_SSL_ENABLED} instead
* @deprecated use {@link NettyShuffleEnvironmentOptions#DATA_SSL_ENABLED} instead
*/
@Deprecated
public static final boolean DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED = true;
Expand All @@ -1424,7 +1424,7 @@ public final class ConfigConstants {
/**
* Config key has been deprecated. Therefore, no default value required.
*
* @deprecated {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} provides the default value now
* @deprecated {@link NettyShuffleEnvironmentOptions#NETWORK_NUM_BUFFERS} provides the default value now
*/
@Deprecated
public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@PublicEvolving
@ConfigGroups(groups = @ConfigGroup(name = "NetworkNetty", keyPrefix = "taskmanager.network.netty"))
public class NetworkEnvironmentOptions {
public class NettyShuffleEnvironmentOptions {

// ------------------------------------------------------------------------
// Network General Options
Expand Down Expand Up @@ -212,5 +212,5 @@ public class NetworkEnvironmentOptions {
// ------------------------------------------------------------------------

/** Not intended to be instantiated. */
private NetworkEnvironmentOptions() {}
private NettyShuffleEnvironmentOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -71,7 +71,7 @@ public void checkOperatingSystem() {
}

/**
* Tests that {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)}
* Tests that {@link NettyShuffleEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)}
* has the same result as the shell script.
*/
@Test
Expand Down Expand Up @@ -159,9 +159,9 @@ private static Configuration getConfig(
config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);

config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));

if (managedMemSizeMB == 0) {
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
Expand Down Expand Up @@ -202,7 +202,7 @@ private static Configuration getRandomConfig(final Random ran) {
Configuration config = getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
long totalJavaMemorySize = ((long) javaMemMB) << 20; // megabytes to bytes
final int networkBufMB =
(int) (NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
(int) (NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
// max (exclusive): total - netbuf
managedMemSize = Math.min(javaMemMB - networkBufMB - 1, ran.nextInt(Integer.MAX_VALUE));
} else {
Expand All @@ -228,14 +228,14 @@ private void compareNetworkBufJavaVsScript(final Configuration config, final flo

final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);

long javaNetworkBufMem = NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
long javaNetworkBufMem = NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory(
totalJavaMemorySizeMB << 20, config);

String[] command = {"src/test/bin/calcTMNetBufMem.sh",
totalJavaMemorySizeMB + "m",
String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)};
String.valueOf(config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)};

String scriptOutput = executeScript(command);

Expand Down Expand Up @@ -272,9 +272,9 @@ private void compareHeapSizeJavaVsScript(final Configuration config, float toler
String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
totalJavaMemorySizeMB + "m",
String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX),
String.valueOf(config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX),
config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
String scriptOutput = executeScript(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand All @@ -72,9 +72,9 @@
* The network environment contains the data structures that keep track of all intermediate results
* and shuffle data exchanges.
*/
public class NetworkEnvironment implements ShuffleEnvironment<ResultPartition, SingleInputGate> {
public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartition, SingleInputGate> {

private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
private static final Logger LOG = LoggerFactory.getLogger(NettyShuffleEnvironment.class);

private static final String METRIC_GROUP_NETWORK = "Network";
private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
Expand All @@ -89,7 +89,7 @@ public class NetworkEnvironment implements ShuffleEnvironment<ResultPartition, S

private final ResourceID taskExecutorLocation;

private final NetworkEnvironmentConfiguration config;
private final NettyShuffleEnvironmentConfiguration config;

private final NetworkBufferPool networkBufferPool;

Expand All @@ -105,9 +105,9 @@ public class NetworkEnvironment implements ShuffleEnvironment<ResultPartition, S

private boolean isClosed;

private NetworkEnvironment(
private NettyShuffleEnvironment(
ResourceID taskExecutorLocation,
NetworkEnvironmentConfiguration config,
NettyShuffleEnvironmentConfiguration config,
NetworkBufferPool networkBufferPool,
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
Expand All @@ -124,8 +124,8 @@ private NetworkEnvironment(
this.isClosed = false;
}

public static NetworkEnvironment create(
NetworkEnvironmentConfiguration config,
public static NettyShuffleEnvironment create(
NettyShuffleEnvironmentConfiguration config,
ResourceID taskExecutorLocation,
TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup,
Expand Down Expand Up @@ -165,7 +165,7 @@ public static NetworkEnvironment create(
taskEventPublisher,
networkBufferPool);

return new NetworkEnvironment(
return new NettyShuffleEnvironment(
taskExecutorLocation,
config,
networkBufferPool,
Expand Down Expand Up @@ -205,7 +205,7 @@ public NetworkBufferPool getNetworkBufferPool() {
}

@VisibleForTesting
public NetworkEnvironmentConfiguration getConfiguration() {
public NettyShuffleEnvironmentConfiguration getConfiguration() {
return config;
}

Expand Down Expand Up @@ -244,7 +244,7 @@ public Collection<ResultPartition> createResultPartitionWriters(
MetricGroup outputGroup,
MetricGroup buffersGroup) {
synchronized (lock) {
Preconditions.checkState(!isClosed, "The NetworkEnvironment has already been shut down.");
Preconditions.checkState(!isClosed, "The NettyShuffleEnvironment has already been shut down.");

ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
int counter = 0;
Expand All @@ -267,7 +267,7 @@ public Collection<SingleInputGate> createInputGates(
MetricGroup inputGroup,
MetricGroup buffersGroup) {
synchronized (lock) {
Preconditions.checkState(!isClosed, "The NetworkEnvironment has already been shut down.");
Preconditions.checkState(!isClosed, "The NettyShuffleEnvironment has already been shut down.");

InputChannelMetrics inputChannelMetrics = new InputChannelMetrics(parentGroup);
SingleInputGate[] inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];
Expand Down Expand Up @@ -331,7 +331,7 @@ public boolean updatePartitionInfo(
@Override
public int start() throws IOException {
synchronized (lock) {
Preconditions.checkState(!isClosed, "The NetworkEnvironment has already been shut down.");
Preconditions.checkState(!isClosed, "The NettyShuffleEnvironment has already been shut down.");

LOG.info("Starting the network environment and its components.");

Expand Down Expand Up @@ -395,8 +395,8 @@ public boolean isClosed() {
}
}

public static NetworkEnvironment fromShuffleContext(ShuffleEnvironmentContext shuffleEnvironmentContext) {
NetworkEnvironmentConfiguration networkConfig = NetworkEnvironmentConfiguration.fromConfiguration(
public static NettyShuffleEnvironment fromShuffleContext(ShuffleEnvironmentContext shuffleEnvironmentContext) {
NettyShuffleEnvironmentConfiguration networkConfig = NettyShuffleEnvironmentConfiguration.fromConfiguration(
shuffleEnvironmentContext.getConfiguration(),
shuffleEnvironmentContext.getMaxJvmHeapMemory(),
shuffleEnvironmentContext.isLocalCommunicationOnly(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
Expand Down Expand Up @@ -149,9 +149,9 @@ public List<MemorySegment> requestMemorySegments() throws IOException {
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
memorySegmentSize,
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
}

this.numTotalRequiredBuffers += numberOfSegmentsToRequest;
Expand Down Expand Up @@ -284,9 +284,9 @@ public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, O
totalNumberOfMemorySegments - numTotalRequiredBuffers,
totalNumberOfMemorySegments,
memorySegmentSize,
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
}

this.numTotalRequiredBuffers += numRequiredBuffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NetworkEnvironmentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.net.SSLUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -98,37 +98,37 @@ public int getNumberOfSlots() {
// ------------------------------------------------------------------------

public int getServerConnectBacklog() {
return config.getInteger(NetworkEnvironmentOptions.CONNECT_BACKLOG);
return config.getInteger(NettyShuffleEnvironmentOptions.CONNECT_BACKLOG);
}

public int getNumberOfArenas() {
// default: number of slots
final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_ARENAS);
final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS);
return configValue == -1 ? numberOfSlots : configValue;
}

public int getServerNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_THREADS_SERVER);
final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER);
return configValue == -1 ? numberOfSlots : configValue;
}

public int getClientNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_THREADS_CLIENT);
final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_CLIENT);
return configValue == -1 ? numberOfSlots : configValue;
}

public int getClientConnectTimeoutSeconds() {
return config.getInteger(NetworkEnvironmentOptions.CLIENT_CONNECT_TIMEOUT_SECONDS);
return config.getInteger(NettyShuffleEnvironmentOptions.CLIENT_CONNECT_TIMEOUT_SECONDS);
}

public int getSendAndReceiveBufferSize() {
return config.getInteger(NetworkEnvironmentOptions.SEND_RECEIVE_BUFFER_SIZE);
return config.getInteger(NettyShuffleEnvironmentOptions.SEND_RECEIVE_BUFFER_SIZE);
}

public TransportType getTransportType() {
String transport = config.getString(NetworkEnvironmentOptions.TRANSPORT_TYPE);
String transport = config.getString(NettyShuffleEnvironmentOptions.TRANSPORT_TYPE);

switch (transport) {
case "nio":
Expand All @@ -155,7 +155,7 @@ public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
}

public boolean getSSLEnabled() {
return config.getBoolean(NetworkEnvironmentOptions.DATA_SSL_ENABLED)
return config.getBoolean(NettyShuffleEnvironmentOptions.DATA_SSL_ENABLED)
&& SSLUtils.isInternalSSLEnabled(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
Expand All @@ -38,7 +39,7 @@
import java.util.Optional;

/**
* Factory for {@link ResultPartition} to use in {@link org.apache.flink.runtime.io.network.NetworkEnvironment}.
* Factory for {@link ResultPartition} to use in {@link NettyShuffleEnvironment}.
*/
public class ResultPartitionFactory {
private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
Expand Down
Loading

0 comments on commit c104daf

Please sign in to comment.