Skip to content

Commit

Permalink
[FLINK-13986][core][config] Remove legacy config option TaskManagerOp…
Browse files Browse the repository at this point in the history
…tions#TASK_MANAGER_HEAP_MEMORY.
  • Loading branch information
xintongsong authored and azagrebin committed Dec 3, 2019
1 parent d24c324 commit ba1d711
Show file tree
Hide file tree
Showing 18 changed files with 21 additions and 32 deletions.
2 changes: 1 addition & 1 deletion docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Configuration options to control Flink's restart behaviour in case of job failur

{% include generated/task_manager_configuration.html %}

For *batch* jobs Flink allocates a fraction of 0.7 of the free memory (total memory configured via taskmanager.heap.size minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents OutOfMemoryExceptions because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
For *batch* jobs Flink allocates a fraction of 0.7 of the free memory (total memory configured via taskmanager.memory.total-process.size minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents OutOfMemoryExceptions because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.

The default fraction for managed memory can be adjusted using the taskmanager.memory.fraction parameter. An absolute value may be set using taskmanager.memory.size (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.

Expand Down
2 changes: 1 addition & 1 deletion docs/ops/config.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Configuration options to control Flink's restart behaviour in case of job failur

{% include generated/task_manager_configuration.html %}

For *batch* jobs Flink allocates a fraction of 0.7 of the free memory (total memory configured via taskmanager.heap.size minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents OutOfMemoryExceptions because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.
For *batch* jobs Flink allocates a fraction of 0.7 of the free memory (total memory configured via taskmanager.memory.total-process.size minus memory used for network buffers) for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents OutOfMemoryExceptions because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.

The default fraction for managed memory can be adjusted using the taskmanager.memory.fraction parameter. An absolute value may be set using taskmanager.memory.size (overrides the fraction parameter). If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.

Expand Down
2 changes: 1 addition & 1 deletion docs/ops/deployment/cluster_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Please see the [configuration page](../config.html) for details and additional c
In particular,

* the amount of available memory per JobManager (`jobmanager.heap.size`),
* the amount of available memory per TaskManager (`taskmanager.heap.size`),
* the amount of available memory per TaskManager (`taskmanager.memory.total-process.size`),
* the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
* the total number of CPUs in the cluster (`parallelism.default`) and
* the temporary directories (`io.tmp.dirs`)
Expand Down
2 changes: 1 addition & 1 deletion docs/ops/deployment/cluster_setup.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Please see the [configuration page](../config.html) for details and additional c
In particular,

* the amount of available memory per JobManager (`jobmanager.heap.size`),
* the amount of available memory per TaskManager (`taskmanager.heap.size`),
* the amount of available memory per TaskManager (`taskmanager.memory.total-process.size`),
* the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
* the total number of CPUs in the cluster (`parallelism.default`) and
* the temporary directories (`io.tmp.dirs`)
Expand Down
2 changes: 1 addition & 1 deletion docs/ops/deployment/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ data:
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.memory.total-process.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
Expand Down
2 changes: 1 addition & 1 deletion docs/ops/deployment/kubernetes.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ data:
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.memory.total-process.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
Expand Down
4 changes: 2 additions & 2 deletions docs/ops/deployment/mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ For example:
-Djobmanager.rpc.port=6123 \
-Drest.port=8081 \
-Dmesos.resourcemanager.tasks.mem=4096 \
-Dtaskmanager.heap.size=3500m \
-Dtaskmanager.memory.total-process.size=3500m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10

Expand All @@ -237,7 +237,7 @@ Here is an example configuration for Marathon:

{
"id": "flink",
"cmd": "$FLINK_HOME/bin/mesos-appmaster.sh -Djobmanager.heap.size=1024m -Djobmanager.rpc.port=6123 -Drest.port=8081 -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.size=1024m -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dmesos.resourcemanager.tasks.cpus=1",
"cmd": "$FLINK_HOME/bin/mesos-appmaster.sh -Djobmanager.heap.size=1024m -Djobmanager.rpc.port=6123 -Drest.port=8081 -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.memory.total-process.size=1024m -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dmesos.resourcemanager.tasks.cpus=1",
"cpus": 1.0,
"mem": 1024
}
Expand Down
2 changes: 1 addition & 1 deletion docs/ops/deployment/mesos.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ For example:
-Djobmanager.rpc.port=6123 \
-Drest.port=8081 \
-Dmesos.resourcemanager.tasks.mem=4096 \
-Dtaskmanager.heap.size=3500m \
-Dtaskmanager.memory.total-process.size=3500m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,10 @@ public class TaskManagerOptions {
// General TaskManager Options
// ------------------------------------------------------------------------

/**
* JVM heap size for the TaskManagers with memory size.
*/
@Deprecated
public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
key("taskmanager.heap.size")
.defaultValue("1024m")
.withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
" the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
" YARN container, minus a certain tolerance value.");

/**
* JVM heap size (in megabytes) for the TaskManagers.
*
* @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
* @deprecated use {@link #TOTAL_PROCESS_MEMORY}
*/
@Deprecated
public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY_MB =
Expand Down Expand Up @@ -255,7 +244,7 @@ public class TaskManagerOptions {
public static final ConfigOption<String> TOTAL_PROCESS_MEMORY =
key("taskmanager.memory.total-process.size")
.noDefaultValue()
.withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
.withDeprecatedKeys("taskmanager.heap.size")
.withDescription("Total Process Memory size for the TaskExecutors. This includes all the memory that a"
+ " TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On"
+ " containerized setups, this should be set to the container memory.");
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/resources/flink-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobmanager.heap.size: 1024m

# The heap size for the TaskManager JVM

taskmanager.heap.size: 1024m
taskmanager.memory.total-process.size: 1024m


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ function create_ha_config() {
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.memory.total-process.size: 1024m
taskmanager.numberOfTaskSlots: ${TASK_SLOTS_PER_TM_HA}
#==============================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
set_config_key "akka.ask.timeout" "60 s"
set_config_key "web.timeout" "60000"

set_config_key "taskmanager.heap.size" "1024m" # 1024Mb x 5TMs = 5Gb total heap
set_config_key "taskmanager.memory.total-process.size" "1024m" # 1024Mb x 5TMs = 5Gb total heap

set_config_key "taskmanager.memory.size" "8" # 8Mb
set_config_key "taskmanager.network.memory.min" "128mb"
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/test_tpcds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ cd "$END_TO_END_DIR"

echo "[INFO]Preparing Flink cluster..."

set_config_key "taskmanager.heap.size" "4096m"
set_config_key "taskmanager.memory.total-process.size" "4096m"
set_config_key "taskmanager.numberOfTaskSlots" "4"
set_config_key "parallelism.default" "4"
start_cluster
Expand Down
2 changes: 1 addition & 1 deletion flink-jepsen/src/jepsen/flink/db.clj
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@
"-Djobmanager.heap.size=2048m"
"-Djobmanager.rpc.port=6123"
"-Dmesos.resourcemanager.tasks.mem=2048"
"-Dtaskmanager.heap.size=2048m"
"-Dtaskmanager.memory.total-process.size=2048m"
"-Dmesos.resourcemanager.tasks.cpus=1"
"-Drest.bind-address=$(hostname -f)"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,11 @@ private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(final Configur
}

private static boolean isTotalProcessMemorySizeExplicitlyConfigured(final Configuration config) {
// backward compatible with the deprecated config options TASK_MANAGER_HEAP_MEMORY and TASK_MANAGER_HEAP_MEMORY_MB
// backward compatible with the deprecated config options TASK_MANAGER_HEAP_MEMORY_MB
// only when they are explicitly configured by the user
@SuppressWarnings("deprecation")
final boolean legacyConfigured =
config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) || config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
final boolean configuredWithEnv = System.getenv("FLINK_TM_HEAP") != null;
return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY) || legacyConfigured || configuredWithEnv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public void testConfigTotalProcessMemoryLegacySize() {
final MemorySize totalProcessMemorySize = MemorySize.parse("2g");

@SuppressWarnings("deprecation")
final ConfigOption<String> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
final ConfigOption<String> legacyOption = TaskManagerOptions.TOTAL_PROCESS_MEMORY;

Configuration conf = new Configuration();
conf.setString(legacyOption, totalProcessMemorySize.getMebiBytes() + "m");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void createResourceManager_WithLessMemoryThanContainerizedHeapCutoffMin_S
final TestingRpcService rpcService = new TestingRpcService();
try {
final Configuration configuration = new Configuration();
configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, new MemorySize(128 * 1024 * 1024).toString());
configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, new MemorySize(128 * 1024 * 1024).toString());
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 600);

final ResourceManager<ResourceID> ignored = resourceManagerFactory.createResourceManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandL
if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
tmMemoryVal += "m";
}
effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, tmMemoryVal);
effectiveConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, tmMemoryVal);
}

if (commandLine.hasOption(slots.getOpt())) {
Expand Down

0 comments on commit ba1d711

Please sign in to comment.