From 79f432878888fcf35fba5c2d8ace71ea49ca6741 Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Thu, 21 Mar 2019 16:35:06 +0800 Subject: [PATCH 1/9] [FLINK-12067][network] Refactor the constructor of NetworkEnvironment --- .../io/network/NetworkEnvironment.java | 92 ++---- .../partition/consumer/SingleInputGate.java | 14 +- .../taskexecutor/TaskManagerRunner.java | 1 + .../taskexecutor/TaskManagerServices.java | 150 +--------- .../TaskManagerServicesConfiguration.java | 270 ++++++++++++------ .../NetworkEnvironmentConfiguration.java | 45 ++- ...etworkEnvironmentConfigurationBuilder.java | 95 ++++++ .../io/network/NetworkEnvironmentTest.java | 14 +- .../partition/ResultPartitionTest.java | 5 +- .../consumer/SingleInputGateTest.java | 28 +- ...skExecutorLocalStateStoresManagerTest.java | 6 +- .../NetworkBufferCalculationTest.java | 89 +++--- .../taskexecutor/TaskExecutorTest.java | 9 +- .../StreamNetworkBenchmarkEnvironment.java | 29 +- 14 files changed, 415 insertions(+), 432 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfigurationBuilder.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 1d2536512f2ef..7380ebe835b98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -22,10 +22,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -48,6 +51,8 @@ public class NetworkEnvironment { private final Object lock = new Object(); + private final NetworkEnvironmentConfiguration config; + private final NetworkBufferPool networkBufferPool; private final ConnectionManager connectionManager; @@ -56,64 +61,25 @@ public class NetworkEnvironment { private final TaskEventPublisher taskEventPublisher; - private final int partitionRequestInitialBackoff; - - private final int partitionRequestMaxBackoff; - - /** Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). */ - private final int networkBuffersPerChannel; + private boolean isShutdown; - /** Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ - private final int extraNetworkBuffersPerGate; + public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) { + this.config = checkNotNull(config); - private final boolean enableCreditBased; + this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); - private boolean isShutdown; + NettyConfig nettyConfig = config.nettyConfig(); + if (nettyConfig != null) { + this.connectionManager = new NettyConnectionManager(nettyConfig); + } else { + this.connectionManager = new LocalConnectionManager(); + } - public NetworkEnvironment( - int numBuffers, - int memorySegmentSize, - int partitionRequestInitialBackoff, - int partitionRequestMaxBackoff, - int networkBuffersPerChannel, - int extraNetworkBuffersPerGate, - boolean enableCreditBased) { - this( - new NetworkBufferPool(numBuffers, memorySegmentSize), - new LocalConnectionManager(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - networkBuffersPerChannel, - extraNetworkBuffersPerGate, - enableCreditBased); - } + this.resultPartitionManager = new ResultPartitionManager(); - public NetworkEnvironment( - NetworkBufferPool networkBufferPool, - ConnectionManager connectionManager, - ResultPartitionManager resultPartitionManager, - TaskEventPublisher taskEventPublisher, - int partitionRequestInitialBackoff, - int partitionRequestMaxBackoff, - int networkBuffersPerChannel, - int extraNetworkBuffersPerGate, - boolean enableCreditBased) { - - this.networkBufferPool = checkNotNull(networkBufferPool); - this.connectionManager = checkNotNull(connectionManager); - this.resultPartitionManager = checkNotNull(resultPartitionManager); this.taskEventPublisher = checkNotNull(taskEventPublisher); - this.partitionRequestInitialBackoff = partitionRequestInitialBackoff; - this.partitionRequestMaxBackoff = partitionRequestMaxBackoff; - isShutdown = false; - this.networkBuffersPerChannel = networkBuffersPerChannel; - this.extraNetworkBuffersPerGate = extraNetworkBuffersPerGate; - - this.enableCreditBased = enableCreditBased; } // -------------------------------------------------------------------------------------------- @@ -132,16 +98,8 @@ public NetworkBufferPool getNetworkBufferPool() { return networkBufferPool; } - public int getPartitionRequestInitialBackoff() { - return partitionRequestInitialBackoff; - } - - public int getPartitionRequestMaxBackoff() { - return partitionRequestMaxBackoff; - } - - public boolean isCreditBased() { - return enableCreditBased; + public NetworkEnvironmentConfiguration getConfiguration() { + return config; } // -------------------------------------------------------------------------------------------- @@ -174,8 +132,8 @@ public void setupPartition(ResultPartition partition) throws IOException { try { int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ? - partition.getNumberOfSubpartitions() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; + partition.getNumberOfSubpartitions() * config.networkBuffersPerChannel() + + config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE; // If the partition type is back pressure-free, we register with the buffer pool for // callbacks to release memory. bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), @@ -203,17 +161,17 @@ public void setupInputGate(SingleInputGate gate) throws IOException { BufferPool bufferPool = null; int maxNumberOfMemorySegments; try { - if (enableCreditBased) { + if (config.isCreditBased()) { maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - extraNetworkBuffersPerGate : Integer.MAX_VALUE; + config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE; // assign exclusive buffers to input channels directly and use the rest for floating buffers - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); + gate.assignExclusiveSegments(networkBufferPool, config.networkBuffersPerChannel()); bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments); } else { maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - gate.getNumberOfInputChannels() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; + gate.getNumberOfInputChannels() * config.networkBuffersPerChannel() + + config.floatingNetworkBuffersPerGate() : Integer.MAX_VALUE; bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), maxNumberOfMemorySegments); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index f457ccbe64fcf..a437c55e1ee18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -680,7 +680,7 @@ public static SingleInputGate create( final SingleInputGate inputGate = new SingleInputGate( owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex, - icdd.length, taskActions, metrics, networkEnvironment.isCreditBased()); + icdd.length, taskActions, metrics, networkEnvironment.getConfiguration().isCreditBased()); // Create the input channels. There is one input channel for each consumed partition. final InputChannel[] inputChannels = new InputChannel[icdd.length]; @@ -697,8 +697,8 @@ public static SingleInputGate create( inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, networkEnvironment.getResultPartitionManager(), taskEventPublisher, - networkEnvironment.getPartitionRequestInitialBackoff(), - networkEnvironment.getPartitionRequestMaxBackoff(), + networkEnvironment.getConfiguration().partitionRequestInitialBackoff(), + networkEnvironment.getConfiguration().partitionRequestMaxBackoff(), metrics ); @@ -708,8 +708,8 @@ else if (partitionLocation.isRemote()) { inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, partitionLocation.getConnectionId(), networkEnvironment.getConnectionManager(), - networkEnvironment.getPartitionRequestInitialBackoff(), - networkEnvironment.getPartitionRequestMaxBackoff(), + networkEnvironment.getConfiguration().partitionRequestInitialBackoff(), + networkEnvironment.getConfiguration().partitionRequestMaxBackoff(), metrics ); @@ -720,8 +720,8 @@ else if (partitionLocation.isUnknown()) { networkEnvironment.getResultPartitionManager(), taskEventPublisher, networkEnvironment.getConnectionManager(), - networkEnvironment.getPartitionRequestInitialBackoff(), - networkEnvironment.getPartitionRequestMaxBackoff(), + networkEnvironment.getConfiguration().partitionRequestInitialBackoff(), + networkEnvironment.getConfiguration().partitionRequestMaxBackoff(), metrics ); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 8b74889ff2cac..0ef5a065f458f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -353,6 +353,7 @@ public static TaskExecutor startTaskManager( TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( configuration, + EnvironmentInformation.getMaxJvmHeapMemory(), remoteAddress, localCommunicationOnly); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index c3bea14d2b749..c6f6bec5715af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -30,20 +30,12 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.ConnectionManager; -import org.apache.flink.runtime.io.network.LocalConnectionManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.TaskEventPublisher; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -247,8 +239,8 @@ public static TaskManagerServices fromConfiguration( final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); - final NetworkEnvironment network = createNetworkEnvironment( - taskManagerServicesConfiguration, maxJvmHeapMemory, taskEventDispatcher); + final NetworkEnvironment network = new NetworkEnvironment( + taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher); network.start(); final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); @@ -400,60 +392,6 @@ private static MemoryManager createMemoryManager( return memoryManager; } - /** - * Creates the {@link NetworkEnvironment} from the given {@link TaskManagerServicesConfiguration}. - * - * @param taskManagerServicesConfiguration to construct the network environment from - * @param maxJvmHeapMemory the maximum JVM heap size - * @return Network environment - * @throws IOException - */ - private static NetworkEnvironment createNetworkEnvironment( - TaskManagerServicesConfiguration taskManagerServicesConfiguration, - long maxJvmHeapMemory, - TaskEventPublisher taskEventPublisher) { - - NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig(); - - final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory); - int segmentSize = networkEnvironmentConfiguration.networkBufferSize(); - - // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) - final long numNetBuffersLong = networkBuf / segmentSize; - if (numNetBuffersLong > Integer.MAX_VALUE) { - throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf - + ") corresponds to more than MAX_INT pages."); - } - - NetworkBufferPool networkBufferPool = new NetworkBufferPool( - (int) numNetBuffersLong, - segmentSize); - - ConnectionManager connectionManager; - boolean enableCreditBased = false; - NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig(); - if (nettyConfig != null) { - connectionManager = new NettyConnectionManager(nettyConfig); - enableCreditBased = nettyConfig.isCreditBasedEnabled(); - } else { - connectionManager = new LocalConnectionManager(); - } - - ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); - - // we start the network first, to make sure it can allocate its buffers first - return new NetworkEnvironment( - networkBufferPool, - connectionManager, - resultPartitionManager, - taskEventPublisher, - networkEnvironmentConfiguration.partitionRequestInitialBackoff(), - networkEnvironmentConfiguration.partitionRequestMaxBackoff(), - networkEnvironmentConfiguration.networkBuffersPerChannel(), - networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(), - enableCreditBased); - } - /** * Calculates the amount of memory used for network buffers based on the total memory to use and * the according configuration parameters. @@ -531,90 +469,6 @@ public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Config return networkBufBytes; } - /** - * Calculates the amount of memory used for network buffers inside the current JVM instance - * based on the available heap or the max heap size and the according configuration parameters. - * - *

For containers or when started via scripts, if started with a memory limit and set to use - * off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able - * to extract the intended values from this. - * - *

The following configuration parameters are involved: - *

. - * - * @param tmConfig task manager services configuration object - * @param maxJvmHeapMemory the maximum JVM heap size - * - * @return memory to use for network buffers (in bytes) - */ - public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig, long maxJvmHeapMemory) { - final NetworkEnvironmentConfiguration networkConfig = tmConfig.getNetworkConfig(); - - final float networkBufFraction = networkConfig.networkBufFraction(); - final long networkBufMin = networkConfig.networkBufMin(); - final long networkBufMax = networkConfig.networkBufMax(); - - if (networkBufMin == networkBufMax) { - // fixed network buffer pool size - return networkBufMin; - } - - // relative network buffer pool size using the fraction... - - // The maximum heap memory has been adjusted as in - // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)) - // and we need to invert these calculations. - - final MemoryType memType = tmConfig.getMemoryType(); - - final long jvmHeapNoNet; - if (memType == MemoryType.HEAP) { - jvmHeapNoNet = maxJvmHeapMemory; - } else if (memType == MemoryType.OFF_HEAP) { - - // check if a value has been configured - long configuredMemory = tmConfig.getConfiguredMemory() << 20; // megabytes to bytes - - if (configuredMemory > 0) { - // The maximum heap memory has been adjusted according to configuredMemory, i.e. - // maxJvmHeap = jvmHeapNoNet - configuredMemory - - jvmHeapNoNet = maxJvmHeapMemory + configuredMemory; - } else { - // The maximum heap memory has been adjusted according to the fraction, i.e. - // maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * managedFraction = jvmHeapNoNet * (1 - managedFraction) - - final float managedFraction = tmConfig.getMemoryFraction(); - jvmHeapNoNet = (long) (maxJvmHeapMemory / (1.0 - managedFraction)); - } - } else { - throw new RuntimeException("No supported memory type detected."); - } - - // finally extract the network buffer memory size again from: - // jvmHeapNoNet = jvmHeap - networkBufBytes - // = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction) - final long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, - (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction))); - - TaskManagerServicesConfiguration - .checkConfigParameter(networkBufBytes < maxJvmHeapMemory, - "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", - "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", - "Network buffer memory size too large: " + networkBufBytes + " >= " + - maxJvmHeapMemory + "(maximum JVM heap size)"); - - return networkBufBytes; - } - /** * Calculates the amount of heap memory to use (to set via -Xmx and -Xms) * based on the total memory to use and the given configuration parameters. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index e791a92a42815..a050ed5c0e3af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; @@ -209,6 +210,7 @@ public RetryingRegistrationConfiguration getRetryingRegistrationConfiguration() * sanity check them. * * @param configuration The configuration. + * @param maxJvmHeapMemory The maximum JVM heap size, in bytes. * @param remoteAddress identifying the IP address under which the TaskManager will be accessible * @param localCommunication True, to skip initializing the network stack. * Use only in cases where only one task manager runs. @@ -216,8 +218,9 @@ public RetryingRegistrationConfiguration getRetryingRegistrationConfiguration() */ public static TaskManagerServicesConfiguration fromConfiguration( Configuration configuration, + long maxJvmHeapMemory, InetAddress remoteAddress, - boolean localCommunication) throws Exception { + boolean localCommunication) { // we need this because many configs have been written with a "-1" entry int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); @@ -227,62 +230,24 @@ public static TaskManagerServicesConfiguration fromConfiguration( final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration); String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration); - if (localStateRootDir.length == 0) { // default to temp dirs. localStateRootDir = tmpDirs; } - boolean localRecoveryMode = configuration.getBoolean( - CheckpointingOptions.LOCAL_RECOVERY.key(), - CheckpointingOptions.LOCAL_RECOVERY.defaultValue()); + boolean localRecoveryMode = configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY); final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration( configuration, + maxJvmHeapMemory, localCommunication, remoteAddress, slots); - final QueryableStateConfiguration queryableStateConfig = - parseQueryableStateConfiguration(configuration); - - // extract memory settings - long configuredMemory; - String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(); - if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { - try { - configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); - } catch (IllegalArgumentException e) { - throw new IllegalConfigurationException( - "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); - } - } else { - configuredMemory = Long.valueOf(managedMemorySizeDefaultVal); - } - - checkConfigParameter( - configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) || - configuredMemory > 0, configuredMemory, - TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), - "MemoryManager needs at least one MB of memory. " + - "If you leave this config parameter empty, the system automatically " + - "pick a fraction of the available memory."); - - // check whether we use heap or off-heap memory - final MemoryType memType; - if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) { - memType = MemoryType.OFF_HEAP; - } else { - memType = MemoryType.HEAP; - } + final QueryableStateConfiguration queryableStateConfig = parseQueryableStateConfiguration(configuration); boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE); - float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); - checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction, - TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), - "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); - long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis(); final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration); @@ -295,10 +260,10 @@ public static TaskManagerServicesConfiguration fromConfiguration( networkConfig, queryableStateConfig, slots, - configuredMemory, - memType, + getManagedMemorySize(configuration), + getMemoryType(configuration), preAllocateMemory, - memoryFraction, + getManagedMemoryFraction(configuration), timerServiceShutdownTimeout, retryingRegistrationConfiguration, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); @@ -312,6 +277,7 @@ public static TaskManagerServicesConfiguration fromConfiguration( * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}. * * @param configuration to create the network environment configuration from + * @param maxJvmHeapMemory The maximum JVM heap size, in bytes * @param localTaskManagerCommunication true if task manager communication is local * @param taskManagerAddress address of the task manager * @param slots to start the task manager with @@ -320,51 +286,43 @@ public static TaskManagerServicesConfiguration fromConfiguration( @SuppressWarnings("deprecation") private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration( Configuration configuration, + long maxJvmHeapMemory, boolean localTaskManagerCommunication, InetAddress taskManagerAddress, - int slots) throws Exception { + int slots) { // ----> hosts / ports for communication and data exchange int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); - checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), "Leave config parameter empty or use 0 to let the system choose a port automatically."); checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(), "Number of task slots must be at least one."); - final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes()); - - // check page size of for minimum size - checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, - TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), - "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); - - // check page size for power of two - checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, - TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), - "Memory segment size must be a power of 2."); - - // network buffer memory fraction - - float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); - long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes(); - long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes(); - checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax); - - // fallback: number of network buffers - final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); - checkNetworkConfigOld(numNetworkBuffers); + final int pageSize = getPageSize(configuration); + final int numNetworkBuffers; if (!hasNewNetworkBufConf(configuration)) { - // map old config to new one: - networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize; + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkNetworkConfigOld(numNetworkBuffers); } else { if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { LOG.info("Ignoring old (but still present) network buffer configuration via {}.", TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); } + + final long networkMemorySize = calculateNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; } final NettyConfig nettyConfig; @@ -377,28 +335,92 @@ private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfigurat nettyConfig = null; } - int initialRequestBackoff = configuration.getInteger( - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); - int maxRequestBackoff = configuration.getInteger( - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); - int buffersPerChannel = configuration.getInteger( - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); - int extraBuffersPerGate = configuration.getInteger( - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); return new NetworkEnvironmentConfiguration( - networkBufFraction, - networkBufMin, - networkBufMax, + numNetworkBuffers, pageSize, initialRequestBackoff, maxRequestBackoff, buffersPerChannel, extraBuffersPerGate, + isCreditBased, nettyConfig); } + /** + * Calculates the amount of memory used for network buffers inside the current JVM instance + * based on the available heap or the max heap size and the according configuration parameters. + * + *

For containers or when started via scripts, if started with a memory limit and set to use + * off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able + * to extract the intended values from this. + * + *

The following configuration parameters are involved: + *

. + * + * @param configuration configuration object + * @param maxJvmHeapMemory the maximum JVM heap size + * + * @return memory to use for network buffers (in bytes) + */ + @VisibleForTesting + static long calculateNetworkBufferMemory(Configuration configuration, long maxJvmHeapMemory) { + // The maximum heap memory has been adjusted as in TaskManagerServices#calculateHeapSizeMB + // and we need to invert these calculations. + final long jvmHeapNoNet; + final MemoryType memoryType = getMemoryType(configuration); + if (memoryType == MemoryType.HEAP) { + jvmHeapNoNet = maxJvmHeapMemory; + } else if (memoryType == MemoryType.OFF_HEAP) { + long configuredMemory = getManagedMemorySize(configuration) << 20; // megabytes to bytes + if (configuredMemory > 0) { + // The maximum heap memory has been adjusted according to configuredMemory, i.e. + // maxJvmHeap = jvmHeapNoNet - configuredMemory + jvmHeapNoNet = maxJvmHeapMemory + configuredMemory; + } else { + // The maximum heap memory has been adjusted according to the fraction, i.e. + // maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * managedFraction = jvmHeapNoNet * (1 - managedFraction) + jvmHeapNoNet = (long) (maxJvmHeapMemory / (1.0 - getManagedMemoryFraction(configuration))); + } + } else { + throw new RuntimeException("No supported memory type detected."); + } + + float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); + long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes(); + long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes(); + checkNetworkBufferConfig(getPageSize(configuration), networkBufFraction, networkBufMin, networkBufMax); + + // finally extract the network buffer memory size again from: + // jvmHeapNoNet = jvmHeap - networkBufBytes + // = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction) + long networkMemorySize = Math.min(networkBufMax, Math.max(networkBufMin, + (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction))); + + TaskManagerServicesConfiguration.checkConfigParameter(networkMemorySize < maxJvmHeapMemory, + "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", + "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", + "Network buffer memory size too large: " + networkMemorySize + " >= " + + maxJvmHeapMemory + "(maximum JVM heap size)"); + + return networkMemorySize; + } + /** * Validates the (old) network buffer configuration. * @@ -508,4 +530,88 @@ static void checkConfigParameter(boolean condition, Object parameter, String nam name + " : " + parameter + " - " + errorMessage); } } + + /** + * Parses the configuration to get the managed memory size and validates the value. + * + * @param configuration configuration object + * @return managed memory size (in megabytes) + */ + private static long getManagedMemorySize(Configuration configuration) { + long managedMemorySize; + String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(); + if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { + try { + managedMemorySize = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); + } catch (IllegalArgumentException e) { + throw new IllegalConfigurationException("Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); + } + } else { + managedMemorySize = Long.valueOf(managedMemorySizeDefaultVal); + } + + checkConfigParameter( + configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) || + managedMemorySize > 0, managedMemorySize, + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), + "MemoryManager needs at least one MB of memory. " + + "If you leave this config parameter empty, the system automatically pick a fraction of the available memory."); + + return managedMemorySize; + } + + /** + * Parses the configuration to get the fraction of managed memory and validates the value. + * + * @param configuration configuration object + * @return fraction of managed memory + */ + private static float getManagedMemoryFraction(Configuration configuration) { + float managedMemoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); + + checkConfigParameter(managedMemoryFraction > 0.0f && managedMemoryFraction < 1.0f, managedMemoryFraction, + TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), + "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); + + return managedMemoryFraction; + } + + /** + * Parses the configuration to get the type of memory. + * + * @param configuration configuration object + * @return type of memory + */ + private static MemoryType getMemoryType(Configuration configuration) { + // check whether we use heap or off-heap memory + final MemoryType memType; + if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) { + memType = MemoryType.OFF_HEAP; + } else { + memType = MemoryType.HEAP; + } + return memType; + } + + /** + * Parses the configuration to get the page size and validates the value. + * + * @param configuration configuration object + * @return size of memory segment + */ + private static int getPageSize(Configuration configuration) { + final int pageSize = checkedDownCast(MemorySize.parse( + configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes()); + + // check page size of for minimum size + checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), + "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); + // check page size for power of two + checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), + "Memory segment size must be a power of 2."); + + return pageSize; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java index fec5178e5ed67..dd8606c174455 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java @@ -27,11 +27,7 @@ */ public class NetworkEnvironmentConfiguration { - private final float networkBufFraction; - - private final long networkBufMin; - - private final long networkBufMax; + private final int numNetworkBuffers; private final int networkBufferSize; @@ -43,42 +39,34 @@ public class NetworkEnvironmentConfiguration { private final int floatingNetworkBuffersPerGate; + private final boolean isCreditBased; + private final NettyConfig nettyConfig; public NetworkEnvironmentConfiguration( - float networkBufFraction, - long networkBufMin, - long networkBufMax, + int numNetworkBuffers, int networkBufferSize, int partitionRequestInitialBackoff, int partitionRequestMaxBackoff, int networkBuffersPerChannel, int floatingNetworkBuffersPerGate, + boolean isCreditBased, @Nullable NettyConfig nettyConfig) { - this.networkBufFraction = networkBufFraction; - this.networkBufMin = networkBufMin; - this.networkBufMax = networkBufMax; + this.numNetworkBuffers = numNetworkBuffers; this.networkBufferSize = networkBufferSize; this.partitionRequestInitialBackoff = partitionRequestInitialBackoff; this.partitionRequestMaxBackoff = partitionRequestMaxBackoff; this.networkBuffersPerChannel = networkBuffersPerChannel; this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate; + this.isCreditBased = isCreditBased; this.nettyConfig = nettyConfig; } // ------------------------------------------------------------------------ - public float networkBufFraction() { - return networkBufFraction; - } - - public long networkBufMin() { - return networkBufMin; - } - - public long networkBufMax() { - return networkBufMax; + public int numNetworkBuffers() { + return numNetworkBuffers; } public int networkBufferSize() { @@ -105,6 +93,10 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + // ------------------------------------------------------------------------ @Override @@ -115,6 +107,7 @@ public int hashCode() { result = 31 * result + partitionRequestMaxBackoff; result = 31 * result + networkBuffersPerChannel; result = 31 * result + floatingNetworkBuffersPerGate; + result = 31 * result + (isCreditBased ? 1 : 0); result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0); return result; } @@ -130,14 +123,13 @@ else if (obj == null || getClass() != obj.getClass()) { else { final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj; - return this.networkBufFraction == that.networkBufFraction && - this.networkBufMin == that.networkBufMin && - this.networkBufMax == that.networkBufMax && + return this.numNetworkBuffers == that.numNetworkBuffers && this.networkBufferSize == that.networkBufferSize && this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff && this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff && this.networkBuffersPerChannel == that.networkBuffersPerChannel && this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate && + this.isCreditBased == that.isCreditBased && (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null); } } @@ -145,14 +137,13 @@ else if (obj == null || getClass() != obj.getClass()) { @Override public String toString() { return "NetworkEnvironmentConfiguration{" + - "networkBufFraction=" + networkBufFraction + - ", networkBufMin=" + networkBufMin + - ", networkBufMax=" + networkBufMax + + ", numNetworkBuffers=" + numNetworkBuffers + ", networkBufferSize=" + networkBufferSize + ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff + ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff + ", networkBuffersPerChannel=" + networkBuffersPerChannel + ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate + + ", isCreditBased=" + isCreditBased + ", nettyConfig=" + nettyConfig + '}'; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfigurationBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfigurationBuilder.java new file mode 100644 index 0000000000000..4927a8fafcbff --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfigurationBuilder.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.runtime.io.network.netty.NettyConfig; + +/** + * Builder for the {@link NetworkEnvironmentConfiguration}. + */ +public class NetworkEnvironmentConfigurationBuilder { + + private int numNetworkBuffers = 1024; + + private int networkBufferSize = 32 * 1024; + + private int partitionRequestInitialBackoff = 0; + + private int partitionRequestMaxBackoff = 0; + + private int networkBuffersPerChannel = 2; + + private int floatingNetworkBuffersPerGate = 8; + + private boolean isCreditBased = true; + + private NettyConfig nettyConfig; + + public NetworkEnvironmentConfigurationBuilder setNumNetworkBuffers(int numNetworkBuffers) { + this.numNetworkBuffers = numNetworkBuffers; + return this; + } + + public NetworkEnvironmentConfigurationBuilder setNetworkBufferSize(int networkBufferSize) { + this.networkBufferSize = networkBufferSize; + return this; + } + + public NetworkEnvironmentConfigurationBuilder setPartitionRequestInitialBackoff(int partitionRequestInitialBackoff) { + this.partitionRequestInitialBackoff = partitionRequestInitialBackoff; + return this; + } + + public NetworkEnvironmentConfigurationBuilder setPartitionRequestMaxBackoff(int partitionRequestMaxBackoff) { + this.partitionRequestMaxBackoff = partitionRequestMaxBackoff; + return this; + } + + public NetworkEnvironmentConfigurationBuilder setNetworkBuffersPerChannel(int networkBuffersPerChannel) { + this.networkBuffersPerChannel = networkBuffersPerChannel; + return this; + } + + public NetworkEnvironmentConfigurationBuilder setFloatingNetworkBuffersPerGate(int floatingNetworkBuffersPerGate) { + this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate; + return this; + } + + public NetworkEnvironmentConfigurationBuilder setIsCreditBased(boolean isCreditBased) { + this.isCreditBased = isCreditBased; + return this; + } + + public NetworkEnvironmentConfigurationBuilder setNettyConfig(NettyConfig nettyConfig) { + this.nettyConfig = nettyConfig; + return this; + } + + public NetworkEnvironmentConfiguration build() { + return new NetworkEnvironmentConfiguration( + numNetworkBuffers, + networkBufferSize, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + networkBuffersPerChannel, + floatingNetworkBuffersPerGate, + isCreditBased, + nettyConfig); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index f400dfa432f86..cff178626995d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.apache.flink.runtime.taskmanager.Task; @@ -56,9 +57,6 @@ */ @RunWith(Parameterized.class) public class NetworkEnvironmentTest { - private static final int numBuffers = 1024; - - private static final int memorySegmentSize = 128; @Parameterized.Parameter public boolean enableCreditBasedFlowControl; @@ -77,8 +75,9 @@ public static List parameters() { */ @Test public void testRegisterTaskUsesBoundedBuffers() throws Exception { - final NetworkEnvironment network = new NetworkEnvironment( - numBuffers, memorySegmentSize, 0, 0, 2, 8, enableCreditBasedFlowControl); + final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder() + .setIsCreditBased(enableCreditBasedFlowControl) + .build()); // result partitions ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2); @@ -182,7 +181,10 @@ public void testRegisterTaskWithInsufficientBuffers() throws Exception { private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception { final NetworkEnvironment network = new NetworkEnvironment( - bufferPoolSize, memorySegmentSize, 0, 0, 2, 8, enableCreditBasedFlowControl); + new NetworkEnvironmentConfigurationBuilder() + .setNumNetworkBuffers(bufferPoolSize) + .setIsCreditBased(enableCreditBasedFlowControl) + .build()); final ConnectionManager connManager = createDummyConnectionManager(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 8009d0cbb7a81..84230102397fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.apache.flink.runtime.taskmanager.TaskActions; @@ -226,7 +227,9 @@ public void testReleaseMemoryOnPipelinedPartition() throws Exception { */ private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { final int numAllBuffers = 10; - final NetworkEnvironment network = new NetworkEnvironment(numAllBuffers, 128, 0, 0, 2, 8, true); + final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder() + .setNumNetworkBuffers(numAllBuffers) + .build()); final ResultPartitionConsumableNotifier notifier = new NoOpResultPartitionConsumableNotifier(); final ResultPartition resultPartition = createPartition(notifier, resultPartitionType, false); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index d9e936df5f7d3..1e1d8942bcb85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.junit.Test; @@ -341,8 +342,11 @@ public void testRequestBackoffConfiguration() throws Exception { int initialBackoff = 137; int maxBackoff = 1001; - final NetworkEnvironment netEnv = new NetworkEnvironment( - 100, 32, initialBackoff, maxBackoff, 2, 8, enableCreditBasedFlowControl); + final NetworkEnvironment netEnv = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder() + .setPartitionRequestInitialBackoff(initialBackoff) + .setPartitionRequestMaxBackoff(maxBackoff) + .setIsCreditBased(enableCreditBasedFlowControl) + .build()); SingleInputGate gate = SingleInputGate.create( "TestTask", @@ -402,8 +406,9 @@ public void testRequestBuffersWithRemoteInputChannel() throws Exception { final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED); int buffersPerChannel = 2; int extraNetworkBuffersPerGate = 8; - final NetworkEnvironment network = new NetworkEnvironment( - 100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, enableCreditBasedFlowControl); + final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder() + .setIsCreditBased(enableCreditBasedFlowControl) + .build()); try { final ResultPartitionID resultPartitionId = new ResultPartitionID(); @@ -441,8 +446,9 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception { final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED); int buffersPerChannel = 2; int extraNetworkBuffersPerGate = 8; - final NetworkEnvironment network = new NetworkEnvironment( - 100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, enableCreditBasedFlowControl); + final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder() + .setIsCreditBased(enableCreditBasedFlowControl) + .build()); try { final ResultPartitionID resultPartitionId = new ResultPartitionID(); @@ -492,9 +498,9 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception { @Test public void testUpdateUnknownInputChannel() throws Exception { final SingleInputGate inputGate = createInputGate(2); - int buffersPerChannel = 2; - final NetworkEnvironment network = new NetworkEnvironment( - 100, 32, 0, 0, buffersPerChannel, 8, enableCreditBasedFlowControl); + final NetworkEnvironment network = new NetworkEnvironment(new NetworkEnvironmentConfigurationBuilder() + .setIsCreditBased(enableCreditBasedFlowControl) + .build()); try { final ResultPartitionID localResultPartitionId = new ResultPartitionID(); @@ -586,8 +592,8 @@ private UnknownInputChannel createUnknownInputChannel( network.getResultPartitionManager(), new TaskEventDispatcher(), network.getConnectionManager(), - network.getPartitionRequestInitialBackoff(), - network.getPartitionRequestMaxBackoff(), + network.getConfiguration().partitionRequestInitialBackoff(), + network.getConfiguration().partitionRequestMaxBackoff(), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup() ); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index 97539bdb0f43f..e99522360a7d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -43,7 +43,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - private static final long MEM_SIZE_PARAM = 128L*1024*1024; + private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024; /** * This tests that the creation of {@link TaskManagerServices} correctly creates the local state root directory @@ -67,7 +67,7 @@ public void testCreationFromConfig() throws Exception { final ResourceID tmResourceID = ResourceID.generate(); TaskManagerServicesConfiguration taskManagerServicesConfiguration = - TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + TaskManagerServicesConfiguration.fromConfiguration(config, MEM_SIZE_PARAM, InetAddress.getLocalHost(), true); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, @@ -108,7 +108,7 @@ public void testCreationFromConfigDefault() throws Exception { final ResourceID tmResourceID = ResourceID.generate(); TaskManagerServicesConfiguration taskManagerServicesConfiguration = - TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + TaskManagerServicesConfiguration.fromConfiguration(config, MEM_SIZE_PARAM, InetAddress.getLocalHost(), true); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java index 7aa43efc4aa0f..6c23602096b7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java @@ -18,19 +18,12 @@ package org.apache.flink.runtime.taskexecutor; -import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.net.InetAddress; -import java.util.Optional; - -import static org.apache.flink.util.MathUtils.checkedDownCast; import static org.junit.Assert.assertEquals; /** @@ -39,78 +32,66 @@ public class NetworkBufferCalculationTest extends TestLogger { /** - * Test for {@link TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration, long)} + * Test for {@link TaskManagerServicesConfiguration#calculateNetworkBufferMemory(Configuration, long)} * using the same (manual) test cases as in {@link TaskManagerServicesTest#calculateHeapSizeMB()}. */ @Test - public void calculateNetworkBufFromHeapSize() throws Exception { - TaskManagerServicesConfiguration tmConfig; + public void calculateNetworkBufFromHeapSize() { + Configuration config; - tmConfig = getTmConfig(Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()), + config = getConfig( + Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()), TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), - 0.1f, 60L << 20, 1L << 30, MemoryType.HEAP); + 0.1f, 60L << 20, 1L << 30, false); assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, - TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 900L << 20)); // 900MB + TaskManagerServicesConfiguration.calculateNetworkBufferMemory(config, 900L << 20)); // 900MB - tmConfig = getTmConfig(Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()), + config = getConfig( + Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()), TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), - 0.2f, 60L << 20, 1L << 30, MemoryType.HEAP); + 0.2f, 60L << 20, 1L << 30, false); assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */, - TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 800L << 20)); // 800MB + TaskManagerServicesConfiguration.calculateNetworkBufferMemory(config, 800L << 20)); // 800MB - tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), - 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, true); assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, - TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 890L << 20)); // 890MB + TaskManagerServicesConfiguration.calculateNetworkBufferMemory(config, 890L << 20)); // 890MB - tmConfig = getTmConfig(-1, 0.1f, - 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + config = getConfig(0, 0.1f, 0.1f, 60L << 20, 1L << 30, true); assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, - TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 810L << 20)); // 810MB + TaskManagerServicesConfiguration.calculateNetworkBufferMemory(config, 810L << 20)); // 810MB } /** - * Returns a task manager services configuration for the tests + * Returns a configuration for the tests. * * @param managedMemory see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} * @param networkBufFraction see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION} * @param networkBufMin see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} * @param networkBufMax see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX} - * @param memType on-heap or off-heap + * @param offHeapMemory see {@link TaskManagerOptions#MEMORY_OFF_HEAP} * * @return configuration object */ - private static TaskManagerServicesConfiguration getTmConfig( - final long managedMemory, final float managedMemoryFraction, float networkBufFraction, - long networkBufMin, long networkBufMax, - final MemoryType memType) { + private static Configuration getConfig( + final long managedMemory, + final float managedMemoryFraction, + float networkBufFraction, + long networkBufMin, + long networkBufMax, + boolean offHeapMemory) { + + final Configuration configuration = new Configuration(); - final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( - networkBufFraction, - networkBufMin, - networkBufMax, - checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()), - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(), - null); + configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), managedMemory); + configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), managedMemoryFraction); + configuration.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), networkBufFraction); + configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), networkBufMin); + configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(), networkBufMax); + configuration.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP.key(), offHeapMemory); - return new TaskManagerServicesConfiguration( - InetAddress.getLoopbackAddress(), - new String[] {}, - new String[] {}, - false, - networkConfig, - QueryableStateConfiguration.disabled(), - 1, - managedMemory, - memType, - false, - managedMemoryFraction, - 0, - RetryingRegistrationConfiguration.defaultConfiguration(), - Optional.empty()); + return configuration; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index c7ae4502688e8..a2a538e6d521a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -89,6 +89,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder; import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -262,13 +263,7 @@ public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception { false); final NetworkEnvironment networkEnvironment = new NetworkEnvironment( - 1, - 1, - 0, - 0, - 2, - 8, - true); + new NetworkEnvironmentConfigurationBuilder().build()); networkEnvironment.start(); final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index 12315e1297166..8c65f1b96f8dd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -32,23 +32,21 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -207,21 +205,14 @@ private NetworkEnvironment createNettyNetworkEnvironment( slots = 1; } - final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize); - - final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( - new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config)); - - return new NetworkEnvironment( - bufferPool, - nettyConnectionManager, - new ResultPartitionManager(), - new TaskEventDispatcher(), - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(), - true); + final NettyConfig nettyConfig = new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config); + final NetworkEnvironmentConfiguration configuration = new NetworkEnvironmentConfigurationBuilder() + .setNumNetworkBuffers(bufferPoolSize) + .setNetworkBufferSize(segmentSize) + .setNettyConfig(nettyConfig) + .build(); + + return new NetworkEnvironment(configuration); } protected ResultPartitionWriter createResultPartition( From 7eb345d3ba0b3d9ee456b85551d8a22bbd0d65b5 Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Tue, 2 Apr 2019 18:50:15 +0800 Subject: [PATCH 2/9] [fixup] Refactor the process of generating NetworkEnvironmentConfiguraiton --- ...anagerHeapSizeCalculationJavaBashTest.java | 10 +- .../QueryableStateConfiguration.java | 29 ++ .../taskexecutor/TaskManagerServices.java | 92 +--- .../TaskManagerServicesConfiguration.java | 403 +----------------- .../NetworkEnvironmentConfiguration.java | 305 +++++++++++++ .../util/ConfigurationParserUtils.java | 159 +++++++ .../ContaineredTaskManagerParametersTest.java | 10 +- .../NetworkBufferCalculationTest.java | 13 +- ... NetworkEnvironmentConfigurationTest.java} | 124 +++++- .../TaskManagerServicesConfigurationTest.java | 86 +--- .../YARNSessionCapacitySchedulerITCase.java | 10 +- 11 files changed, 646 insertions(+), 595 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java rename flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/{TaskManagerServicesTest.java => NetworkEnvironmentConfigurationTest.java} (59%) diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java index 2b84e181cd8d3..4c5c39f9ce15c 100644 --- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java +++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.TestLogger; @@ -69,8 +70,8 @@ public void checkOperatingSystem() { } /** - * Tests that {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} has the same - * result as the shell script. + * Tests that {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)} + * has the same result as the shell script. */ @Test public void compareNetworkBufShellScriptWithJava() throws Exception { @@ -200,7 +201,7 @@ private static Configuration getRandomConfig(final Random ran) { Configuration config = getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac); long totalJavaMemorySize = ((long) javaMemMB) << 20; // megabytes to bytes final int networkBufMB = - (int) (TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20); + (int) (NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20); // max (exclusive): total - netbuf managedMemSize = Math.min(javaMemMB - networkBufMB - 1, ran.nextInt(Integer.MAX_VALUE)); } else { @@ -226,7 +227,8 @@ private void compareNetworkBufJavaVsScript(final Configuration config, final flo final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L); - long javaNetworkBufMem = TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySizeMB << 20, config); + long javaNetworkBufMem = NetworkEnvironmentConfiguration.calculateNetworkBufferMemory( + totalJavaMemorySizeMB << 20, config); String[] command = {"src/test/bin/calcTMNetBufMem.sh", totalJavaMemorySizeMB + "m", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java index 7823a1a889b12..fec9f622f4670 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.QueryableStateOptions; import org.apache.flink.util.NetUtils; @@ -137,4 +138,32 @@ public static QueryableStateConfiguration disabled() { final Iterator serverPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()); return new QueryableStateConfiguration(proxyPorts, serverPorts, 0, 0, 0, 0); } + + /** + * Creates the {@link QueryableStateConfiguration} from the given Configuration. + */ + public static QueryableStateConfiguration fromConfiguration(Configuration config) { + if (!config.getBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER)) { + return null; + } + + final Iterator proxyPorts = NetUtils.getPortRangeFromString( + config.getString(QueryableStateOptions.PROXY_PORT_RANGE)); + final Iterator serverPorts = NetUtils.getPortRangeFromString( + config.getString(QueryableStateOptions.SERVER_PORT_RANGE)); + + final int numProxyServerNetworkThreads = config.getInteger(QueryableStateOptions.PROXY_NETWORK_THREADS); + final int numProxyServerQueryThreads = config.getInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS); + + final int numStateServerNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS); + final int numStateServerQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS); + + return new QueryableStateConfiguration( + proxyPorts, + serverPorts, + numProxyServerNetworkThreads, + numProxyServerQueryThreads, + numStateServerNetworkThreads, + numStateServerQueryThreads); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index c6f6bec5715af..a67ea3cc6ff2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -36,7 +36,9 @@ import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.ConfigurationParserUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -52,7 +54,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES; -import static org.apache.flink.util.MathUtils.checkedDownCast; /** * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager}, @@ -392,83 +393,6 @@ private static MemoryManager createMemoryManager( return memoryManager; } - /** - * Calculates the amount of memory used for network buffers based on the total memory to use and - * the according configuration parameters. - * - *

The following configuration parameters are involved: - *

    - *
  • {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
  • - *
  • {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
  • - *
  • {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and
  • - *
  • {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)
  • - *
. - * - * @param totalJavaMemorySize - * overall available memory to use (heap and off-heap, in bytes) - * @param config - * configuration object - * - * @return memory to use for network buffers (in bytes); at least one memory segment - */ - @SuppressWarnings("deprecation") - public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) { - Preconditions.checkArgument(totalJavaMemorySize > 0); - - int segmentSize = - checkedDownCast(MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes()); - - final long networkBufBytes; - if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) { - // new configuration based on fractions of available memory with selectable min and max - float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); - long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes(); - long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes(); - - TaskManagerServicesConfiguration - .checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax); - - networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, - (long) (networkBufFraction * totalJavaMemorySize))); - - TaskManagerServicesConfiguration - .checkConfigParameter(networkBufBytes < totalJavaMemorySize, - "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", - "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", - "Network buffer memory size too large: " + networkBufBytes + " >= " + - totalJavaMemorySize + " (total JVM memory size)"); - TaskManagerServicesConfiguration - .checkConfigParameter(networkBufBytes >= segmentSize, - "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", - "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", - "Network buffer memory size too small: " + networkBufBytes + " < " + - segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")"); - } else { - // use old (deprecated) network buffers parameter - int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); - networkBufBytes = (long) numNetworkBuffers * (long) segmentSize; - - TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers); - - TaskManagerServicesConfiguration - .checkConfigParameter(networkBufBytes < totalJavaMemorySize, - networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), - "Network buffer memory size too large: " + networkBufBytes + " >= " + - totalJavaMemorySize + " (total JVM memory size)"); - TaskManagerServicesConfiguration - .checkConfigParameter(networkBufBytes >= segmentSize, - networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), - "Network buffer memory size too small: " + networkBufBytes + " < " + - segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")"); - } - - return networkBufBytes; - } - /** * Calculates the amount of heap memory to use (to set via -Xmx and -Xms) * based on the total memory to use and the given configuration parameters. @@ -484,10 +408,9 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration Preconditions.checkArgument(totalJavaMemorySizeMB > 0); // subtract the Java memory used for network buffers (always off-heap) - final long networkBufMB = - calculateNetworkBufferMemory( - totalJavaMemorySizeMB << 20, // megabytes to bytes - config) >> 20; // bytes to megabytes + final long networkBufMB = NetworkEnvironmentConfiguration.calculateNetworkBufferMemory( + totalJavaMemorySizeMB << 20, // megabytes to bytes + config) >> 20; // bytes to megabytes final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB; // split the available Java memory between heap and off-heap @@ -516,9 +439,8 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration offHeapSize = (long) (fraction * remainingJavaMemorySizeMB); } - TaskManagerServicesConfiguration - .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize, - TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), + ConfigurationParserUtils.checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize, + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "Managed memory size too large for " + networkBufMB + " MB network buffer memory and a total of " + totalJavaMemorySizeMB + " MB JVM memory"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index a050ed5c0e3af..0e17196c6b575 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -18,36 +18,22 @@ package org.apache.flink.runtime.taskexecutor; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.QueryableStateOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; -import org.apache.flink.util.MathUtils; -import org.apache.flink.util.NetUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.flink.runtime.util.ConfigurationParserUtils; import javax.annotation.Nullable; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Iterator; import java.util.Optional; -import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES; -import static org.apache.flink.util.MathUtils.checkedDownCast; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -56,7 +42,6 @@ * the io manager and the metric registry. */ public class TaskManagerServicesConfiguration { - private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServicesConfiguration.class); private final InetAddress taskManagerAddress; @@ -133,7 +118,7 @@ public TaskManagerServicesConfiguration( // Getter/Setter // -------------------------------------------------------------------------------------------- - public InetAddress getTaskManagerAddress() { + InetAddress getTaskManagerAddress() { return taskManagerAddress; } @@ -141,11 +126,11 @@ public String[] getTmpDirPaths() { return tmpDirPaths; } - public String[] getLocalRecoveryStateRootDirectories() { + String[] getLocalRecoveryStateRootDirectories() { return localRecoveryStateRootDirectories; } - public boolean isLocalRecoveryEnabled() { + boolean isLocalRecoveryEnabled() { return localRecoveryEnabled; } @@ -153,7 +138,7 @@ public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } - public QueryableStateConfiguration getQueryableStateConfig() { + QueryableStateConfiguration getQueryableStateConfig() { return queryableStateConfig; } @@ -170,7 +155,7 @@ public float getMemoryFraction() { * * @return on-heap or off-heap memory */ - public MemoryType getMemoryType() { + MemoryType getMemoryType() { return memoryType; } @@ -181,15 +166,15 @@ public MemoryType getMemoryType() { * * @see TaskManagerOptions#MANAGED_MEMORY_SIZE */ - public long getConfiguredMemory() { + long getConfiguredMemory() { return configuredMemory; } - public boolean isPreAllocateMemory() { + boolean isPreAllocateMemory() { return preAllocateMemory; } - public long getTimerServiceShutdownTimeout() { + long getTimerServiceShutdownTimeout() { return timerServiceShutdownTimeout; } @@ -197,7 +182,7 @@ public Optional