Skip to content

Commit

Permalink
[FLINK-2773] remove strict upper direct memory limit
Browse files Browse the repository at this point in the history
Setting a strict upper limit for the direct memory size can cause
problems with the direct memory allocation of the Netty network stack
leading to OutOfMemoryExceptions.

This closes apache#1203.
  • Loading branch information
mxm committed Sep 30, 2015
1 parent 1243d7b commit 011cbbf
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 75 deletions.
19 changes: 2 additions & 17 deletions flink-dist/src/main/flink-bin/bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ KEY_JOBM_MEM_SIZE="jobmanager.heap.mb"
KEY_TASKM_MEM_SIZE="taskmanager.heap.mb"
KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
KEY_TASKM_MEM_NETWORK_BUFFERS="taskmanager.network.numberOfBuffers"
# BEGIN:deprecated
KEY_TASKM_MEM_NETWORK_BUFFER_SIZE="taskmanager.network.bufferSizeInBytes"
# END:deprecated
KEY_TASKM_MEM_SEGMENT_SIZE="taskmanager.memory.segment-size"
KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"

KEY_ENV_PID_DIR="env.pid.dir"
Expand Down Expand Up @@ -198,22 +193,12 @@ fi

# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
fi

# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}")
if [ "${BUFFER_SIZE}" -eq "0" ]; then
BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFER_SIZE} "$((32 * 1024))" "${YAML_CONF}")
fi
NUM_BUFFERS=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFERS} "2048" "${YAML_CONF}")
FLINK_TM_MEM_NETWORK_SIZE=$((((NUM_BUFFERS * BUFFER_SIZE) >> 20) + 1))
FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}")
fi

# Define FLINK_TM_OFFHEAP if it is not already set
if [ -z "${FLINK_TM_OFFHEAP}" ]; then
FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} 0 "${YAML_CONF}")
FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
fi

if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
Expand Down
13 changes: 5 additions & 8 deletions flink-dist/src/main/flink-bin/bin/taskmanager.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ if [[ $STARTSTOP == "start" ]]; then
if [ "${FLINK_TM_HEAP}" -gt "0" ]; then

TM_HEAP_SIZE=${FLINK_TM_HEAP}
TM_OFFHEAP_SIZE=0
# some space for Netty initialization
NETTY_BUFFERS=1
# This is an upper bound, much less direct memory will be used
TM_MAX_OFFHEAP_SIZE=${FLINK_TM_HEAP}

if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
Expand All @@ -70,7 +69,6 @@ if [[ $STARTSTOP == "start" ]]; then
echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')."
exit 1
fi
TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
else
# We calculate the memory using a fraction of the total memory
Expand All @@ -79,13 +77,12 @@ if [[ $STARTSTOP == "start" ]]; then
exit 1
fi
# recalculate the JVM heap memory by taking the off-heap ratio into account
TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
OFFHEAP_MANAGED_MEMORY_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
TM_HEAP_SIZE=$((FLINK_TM_HEAP - OFFHEAP_MANAGED_MEMORY_SIZE))
fi
fi

TM_HEAP_SIZE=$((TM_HEAP_SIZE - FLINK_TM_MEM_NETWORK_SIZE - NETTY_BUFFERS))
export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE + FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))M"
export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}M"

fi

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1590,36 +1590,14 @@ object TaskManager {
fraction).toLong

LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
s" heap memory (${relativeMemSize >> 20} MB).")
s"heap memory (${relativeMemSize >> 20} MB).")

relativeMemSize
}
else if (memType == MemoryType.OFF_HEAP) {

val networkBufferSizeNew = configuration.getLong(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)

val networkBufferSizeOld = configuration.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
val networkBufferSize =
if (networkBufferSizeNew != -1) {
networkBufferSizeNew
} else if (networkBufferSizeOld == -1) {
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
} else {
networkBufferSizeOld
}

val numNetworkBuffers = configuration.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)

// direct memory for Netty's off-heap buffers
val networkMemory = (numNetworkBuffers * networkBufferSize) + (1 << 20)

// The maximum heap memory has been adjusted according to the fraction
val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory() + networkMemory
val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory()
val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong

LOG.info(s"Using $fraction of the maximum memory size for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,8 @@ public boolean accept(File dir, String name) {
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) and then divide
// between heap and off-heap memory (see {@link ApplicationMasterActor}).
String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms359m -Xmx359m -XX:MaxDirectMemorySize=65m";
// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains(expected));
expected = " (2/2) (attempt #0) to ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,11 +562,11 @@ trait ApplicationMasterActor extends FlinkActor {
log.info("Create container launch context.")
val ctx = Records.newRecord(classOf[ContainerLaunchContext])

val (heapLimit, offHeapLimit) = calculateMemoryLimits(memoryLimit, streamingMode)
val heapLimit = calculateMemoryLimits(memoryLimit, streamingMode)

val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${offHeapLimit}m $javaOpts")
s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${memoryLimit}m $javaOpts")

if (hasLogback || hasLog4j) {
tmCommand ++=
Expand Down Expand Up @@ -621,46 +621,32 @@ trait ApplicationMasterActor extends FlinkActor {
}

/**
* Calculate the correct JVM heap and off-heap memory limits.
* Calculate the correct JVM heap memory limit.
* @param memoryLimit The maximum memory in megabytes.
* @param streamingMode True if this is a streaming cluster.
* @return A Tuple2 containing the heap and the offHeap limit in megabytes.
*/
private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): (Long, Long) = {

// The new config entry overrides the old one
val networkBufferSizeOld = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)

val networkBufferSize = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
networkBufferSizeOld)

val numNetworkBuffers = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)

// direct memory for Netty's off-heap buffers
val networkMemory = ((numNetworkBuffers * networkBufferSize) >> 20) + 1
private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): Long = {

val useOffHeap = flinkConfiguration.getBoolean(
ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)

if (useOffHeap && !streamingMode){
val fixedOffHeapSize = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)

if (fixedOffHeapSize > 0) {
(memoryLimit - fixedOffHeapSize - networkMemory, fixedOffHeapSize + networkMemory)
memoryLimit - fixedOffHeapSize
} else {
val fraction = flinkConfiguration.getFloat(
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
val offHeapSize = (fraction * memoryLimit).toLong
(memoryLimit - offHeapSize - networkMemory, offHeapSize + networkMemory)
memoryLimit - offHeapSize
}

} else {
(memoryLimit - networkMemory, networkMemory)
memoryLimit
}
}
}

0 comments on commit 011cbbf

Please sign in to comment.