Skip to content

Commit

Permalink
[fixup] Rebase master and address some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijiangW committed Apr 4, 2019
1 parent b7d46f6 commit f9ffd25
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public NetworkEnvironmentConfiguration getNetworkConfig() {
return networkConfig;
}

@Nullable
QueryableStateConfiguration getQueryableStateConfig() {
return queryableStateConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.util.ConfigurationParserUtils;

import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.net.InetAddress;
import java.net.InetSocketAddress;

import static org.apache.flink.util.MathUtils.checkedDownCast;

/**
* Configuration object for the network stack.
*/
Expand All @@ -48,8 +52,10 @@ public class NetworkEnvironmentConfiguration {

private final int partitionRequestMaxBackoff;

/** Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). */
private final int networkBuffersPerChannel;

/** Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */
private final int floatingNetworkBuffersPerGate;

private final boolean isCreditBased;
Expand Down Expand Up @@ -129,46 +135,13 @@ public static NetworkEnvironmentConfiguration fromConfiguration(
boolean localTaskManagerCommunication,
InetAddress taskManagerAddress) {

// ----> hosts / ports for communication and data exchange

final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");

final int pageSize = ConfigurationParserUtils.getPageSize(configuration);
final int dataport = getDataport(configuration);

final int numNetworkBuffers;
if (!hasNewNetworkConfig(configuration)) {
// fallback: number of network buffers
numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);

checkOldNetworkConfig(numNetworkBuffers);
} else {
if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
}

final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);

// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
long numNetworkBuffersLong = networkMemorySize / pageSize;
if (numNetworkBuffersLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize
+ ") corresponds to more than MAX_INT pages.");
}
numNetworkBuffers = (int) numNetworkBuffersLong;
}
final int pageSize = getPageSize(configuration);

final NettyConfig nettyConfig;
if (!localTaskManagerCommunication) {
final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);

nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(),
pageSize, ConfigurationParserUtils.getSlot(configuration), configuration);
} else {
nettyConfig = null;
}
final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport);

int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
Expand All @@ -179,7 +152,7 @@ public static NetworkEnvironmentConfiguration fromConfiguration(
boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);

return new NetworkEnvironmentConfiguration(
numNetworkBuffers,
numberOfNetworkBuffers,
pageSize,
initialRequestBackoff,
maxRequestBackoff,
Expand Down Expand Up @@ -263,7 +236,7 @@ public static long calculateNewNetworkBufferMemory(Configuration config, long ma
*/
@SuppressWarnings("deprecation")
public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
int segmentSize = ConfigurationParserUtils.getPageSize(config);
final int segmentSize = getPageSize(config);

final long networkBufBytes;
if (hasNewNetworkConfig(config)) {
Expand Down Expand Up @@ -312,7 +285,7 @@ private static long calculateNewNetworkBufferMemory(Configuration config, long n
long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();

int pageSize = ConfigurationParserUtils.getPageSize(config);
int pageSize = getPageSize(config);

checkNewNetworkConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);

Expand Down Expand Up @@ -402,11 +375,112 @@ public static boolean hasNewNetworkConfig(final Configuration config) {
!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS);
}

/**
* Parses the hosts / ports for communication and data exchange from configuration.
*
* @param configuration configuration object
* @return the data port
*/
private static int getDataport(Configuration configuration) {
final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");

return dataport;
}

/**
* Calculates the number of network buffers based on configuration and jvm heap size.
*
* @param configuration configuration object
* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
* @return the number of network buffers
*/
private static int calculateNumberOfNetworkBuffers(Configuration configuration, long maxJvmHeapMemory) {
final int numberOfNetworkBuffers;
if (!hasNewNetworkConfig(configuration)) {
// fallback: number of network buffers
numberOfNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);

checkOldNetworkConfig(numberOfNetworkBuffers);
} else {
if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
}

final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);

// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
long numberOfNetworkBuffersLong = networkMemorySize / getPageSize(configuration);
if (numberOfNetworkBuffersLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize
+ ") corresponds to more than MAX_INT pages.");
}
numberOfNetworkBuffers = (int) numberOfNetworkBuffersLong;
}

return numberOfNetworkBuffers;
}

/**
* Parses the configuration to wrapper related components into netty configuration which might be null if
* communication is in the same task manager.
*
* @param configuration configuration object
* @param localTaskManagerCommunication true, to skip initializing the network stack
* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible
* @param dataport data port for communication and data exchange
* @return the netty configuration
*/
@Nullable
private static NettyConfig createNettyConfig(
Configuration configuration,
boolean localTaskManagerCommunication,
InetAddress taskManagerAddress,
int dataport) {

final NettyConfig nettyConfig;
if (!localTaskManagerCommunication) {
final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);

nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(),
getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration);
} else {
nettyConfig = null;
}

return nettyConfig;
}

/**
* Parses the configuration to get the page size and validates the value.
*
* @param configuration configuration object
* @return size of memory segment
*/
private static int getPageSize(Configuration configuration) {
final int pageSize = checkedDownCast(MemorySize.parse(
configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

// check page size of for minimum size
ConfigurationParserUtils.checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
// check page size for power of two
ConfigurationParserUtils.checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Memory segment size must be a power of 2.");

return pageSize;
}

// ------------------------------------------------------------------------

@Override
public int hashCode() {
int result = 1;
result = 31 * result + numNetworkBuffers;
result = 31 * result + networkBufferSize;
result = 31 * result + partitionRequestInitialBackoff;
result = 31 * result + partitionRequestMaxBackoff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.util.MathUtils;

import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
import static org.apache.flink.util.MathUtils.checkedDownCast;

/**
* Utility class to extract related parameters from {@link Configuration} and to
Expand Down Expand Up @@ -97,28 +94,6 @@ public static MemoryType getMemoryType(Configuration configuration) {
return memType;
}

/**
* Parses the configuration to get the page size and validates the value.
*
* @param configuration configuration object
* @return size of memory segment
*/
public static int getPageSize(Configuration configuration) {
final int pageSize = checkedDownCast(MemorySize.parse(
configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

// check page size of for minimum size
checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
// check page size for power of two
checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Memory segment size must be a power of 2.");

return pageSize;
}

/**
* Parses the configuration to get the number of slots and validates the value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public static List<Boolean> parameters() {
public void testRegisterTaskUsesBoundedBuffers() throws Exception {
final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder()
.setIsCreditBased(enableCreditBasedFlowControl)
.build());
.build(),
new TaskEventDispatcher());

// result partitions
ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
Expand Down Expand Up @@ -180,11 +181,11 @@ public void testRegisterTaskWithInsufficientBuffers() throws Exception {
}

private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
final NetworkEnvironment network = new NetworkEnvironment(
new NetworkEnvironmentConfigurationBuilder()
.setNumNetworkBuffers(bufferPoolSize)
.setIsCreditBased(enableCreditBasedFlowControl)
.build());
final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder()
.setNumNetworkBuffers(bufferPoolSize)
.setIsCreditBased(enableCreditBasedFlowControl)
.build(),
new TaskEventDispatcher());

final ConnectionManager connManager = createDummyConnectionManager();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
Expand Down Expand Up @@ -228,8 +229,8 @@ public void testReleaseMemoryOnPipelinedPartition() throws Exception {
private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception {
final int numAllBuffers = 10;
final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder()
.setNumNetworkBuffers(numAllBuffers)
.build());
.setNumNetworkBuffers(numAllBuffers).build(),
new TaskEventDispatcher());
final ResultPartitionConsumableNotifier notifier = new NoOpResultPartitionConsumableNotifier();
final ResultPartition resultPartition = createPartition(notifier, resultPartitionType, false);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ public void testRequestBackoffConfiguration() throws Exception {
.setPartitionRequestInitialBackoff(initialBackoff)
.setPartitionRequestMaxBackoff(maxBackoff)
.setIsCreditBased(enableCreditBasedFlowControl)
.build());
.build(),
new TaskEventDispatcher());

SingleInputGate gate = SingleInputGate.create(
"TestTask",
Expand Down Expand Up @@ -406,9 +407,7 @@ public void testRequestBuffersWithRemoteInputChannel() throws Exception {
final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
int buffersPerChannel = 2;
int extraNetworkBuffersPerGate = 8;
final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder()
.setIsCreditBased(enableCreditBasedFlowControl)
.build());
final NetworkEnvironment network = createNetworkEnvironment();

try {
final ResultPartitionID resultPartitionId = new ResultPartitionID();
Expand Down Expand Up @@ -446,9 +445,7 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
int buffersPerChannel = 2;
int extraNetworkBuffersPerGate = 8;
final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder()
.setIsCreditBased(enableCreditBasedFlowControl)
.build());
final NetworkEnvironment network = createNetworkEnvironment();

try {
final ResultPartitionID resultPartitionId = new ResultPartitionID();
Expand Down Expand Up @@ -498,9 +495,7 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
@Test
public void testUpdateUnknownInputChannel() throws Exception {
final SingleInputGate inputGate = createInputGate(2);
final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder()
.setIsCreditBased(enableCreditBasedFlowControl)
.build());
final NetworkEnvironment network = createNetworkEnvironment();

try {
final ResultPartitionID localResultPartitionId = new ResultPartitionID();
Expand Down Expand Up @@ -610,6 +605,13 @@ private void addRemoteInputChannel(
inputGate.setInputChannel(partitionId.getPartitionId(), remote);
}

private NetworkEnvironment createNetworkEnvironment() {
return new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder()
.setIsCreditBased(enableCreditBasedFlowControl)
.build(),
new TaskEventDispatcher());
}

static void verifyBufferOrEvent(
InputGate inputGate,
boolean expectedIsBuffer,
Expand Down
Loading

0 comments on commit f9ffd25

Please sign in to comment.