Skip to content

Commit

Permalink
[FLINK-16745][coordination] Remove unused container cutoff
Browse files Browse the repository at this point in the history
  • Loading branch information
azagrebin committed Apr 27, 2020
1 parent 6c6e91d commit 679cd0f
Show file tree
Hide file tree
Showing 14 changed files with 9 additions and 213 deletions.
12 changes: 0 additions & 12 deletions docs/_includes/generated/resource_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>containerized.heap-cutoff-min</h5></td>
<td style="word-wrap: break-word;">600</td>
<td>Integer</td>
<td>Minimum amount of heap memory to remove in Job Master containers, as a safety margin.</td>
</tr>
<tr>
<td><h5>containerized.heap-cutoff-ratio</h5></td>
<td style="word-wrap: break-word;">0.25</td>
<td>Float</td>
<td>Percentage of heap space to remove from Job Master containers (YARN / Mesos / Kubernetes), to compensate for other JVM memory usage.</td>
</tr>
<tr>
<td><h5>resourcemanager.job.timeout</h5></td>
<td style="word-wrap: break-word;">"5 minutes"</td>
Expand Down
5 changes: 2 additions & 3 deletions docs/ops/memory/mem_migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,8 @@ Although, the network memory configuration has not changed too much it is recomm
It can change if other memory components have new sizes, e.g. the total memory which the network can be a fraction of.
See also [new detailed memory model](mem_detail.html).

The container cut-off configuration options, [`containerized.heap-cutoff-ratio`](config.html#containerized-heap-cutoff-ratio)
and [`containerized.heap-cutoff-min`](config.html#containerized-heap-cutoff-min), have no effect for task manager processes anymore
but they still have the same semantics for the job manager process. See also [how to migrate container cut-off](#container-cut-off-memory).
The container cut-off configuration options, `containerized.heap-cutoff-ratio` and `containerized.heap-cutoff-min`,
have no effect anymore. See also [how to migrate container cut-off](#container-cut-off-memory).

## Total Memory (Previously Heap Memory)

Expand Down
5 changes: 1 addition & 4 deletions docs/ops/memory/mem_migration.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ Flink 自带的[默认 flink-conf.yaml](#flink-confyaml-中的默认配置) 文
网络内存的大小可能会受到其他内存部分大小变化的影响,例如总内存变化时,根据占比计算出的网络内存也可能发生变化。
请参考[内存模型详解](mem_detail.html)

容器切除(Cut-Off)内存相关的配置参数([`containerized.heap-cutoff-ratio`](config.html#containerized-heap-cutoff-ratio)
[`containerized.heap-cutoff-min`](config.html#containerized-heap-cutoff-min))将不再对 TaskExecutor 进程生效。
对于 JobManager 进程,它们仍具有与此前相同的语义。
请参考[如何升级容器切除内存](#容器切除cut-off内存)
容器切除(Cut-Off)内存相关的配置参数(`containerized.heap-cutoff-ratio``containerized.heap-cutoff-min`)将不再对进程生效。

## 总内存(原堆内存)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,14 @@ public final class ConfigConstants {
/**
* Percentage of heap space to remove from containers (YARN / Mesos / Kubernetes), to compensate
* for other JVM memory usage.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
* @deprecated Not used anymore, but remain here until Flink 2.0
*/
@Deprecated
public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio";

/**
* Minimum amount of heap memory to remove in containers, as a safety margin.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
* @deprecated Not used anymore, but remain here until Flink 2.0
*/
@Deprecated
public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min";
Expand Down Expand Up @@ -434,14 +434,14 @@ public final class ConfigConstants {

/**
* Percentage of heap space to remove from containers started by YARN.
* @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_RATIO}
* @deprecated Not used anymore, but remain here until Flink 2.0
*/
@Deprecated
public static final String YARN_HEAP_CUTOFF_RATIO = "yarn.heap-cutoff-ratio";

/**
* Minimum amount of memory to remove from the heap space as a safety margin.
* @deprecated in favor of {@code #CONTAINERIZED_HEAP_CUTOFF_MIN}
* @deprecated Not used anymore, but remain here until Flink 2.0
*/
@Deprecated
public static final String YARN_HEAP_CUTOFF_MIN = "yarn.heap-cutoff-min";
Expand Down Expand Up @@ -1526,15 +1526,15 @@ public final class ConfigConstants {
/**
* Minimum amount of memory to subtract from the process memory to get the TaskManager
* heap size. We came up with these values experimentally.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
* @deprecated Not used anymore, but remain here until Flink 2.0
*/
@Deprecated
public static final int DEFAULT_YARN_HEAP_CUTOFF = 600;

/**
* Relative amount of memory to subtract from Java process memory to get the TaskManager
* heap size.
* @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
* @deprecated Not used anymore, but remain here until Flink 2.0
*/
@Deprecated
public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,6 @@ public class ResourceManagerOptions {
" default, the port of the JobManager, because the same ActorSystem is used." +
" Its not possible to use this configuration key to define port ranges.");

/**
* Percentage of heap space to remove from Job Master containers (YARN / Mesos/ Kubernetes), to compensate
* for other JVM memory usage.
*/
public static final ConfigOption<Float> CONTAINERIZED_HEAP_CUTOFF_RATIO = ConfigOptions
.key("containerized.heap-cutoff-ratio")
.defaultValue(0.25f)
.withDeprecatedKeys("yarn.heap-cutoff-ratio")
.withDescription("Percentage of heap space to remove from Job Master containers (YARN / Mesos / Kubernetes), " +
"to compensate for other JVM memory usage.");

/**
* Minimum amount of heap memory to remove in Job Master containers, as a safety margin.
*/
public static final ConfigOption<Integer> CONTAINERIZED_HEAP_CUTOFF_MIN = ConfigOptions
.key("containerized.heap-cutoff-min")
.defaultValue(600)
.withDeprecatedKeys("yarn.heap-cutoff-min")
.withDescription("Minimum amount of heap memory to remove in Job Master containers, as a safety margin.");

/**
* The timeout for a slot request to be discarded, in milliseconds.
* @deprecated Use {@link JobManagerOptions#SLOT_REQUEST_TIMEOUT}.
Expand Down
1 change: 0 additions & 1 deletion flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ function start_hadoop_cluster_and_prepare_flink() {
security.kerberos.login.keytab: /home/hadoop-user/hadoop-user.keytab
security.kerberos.login.principal: hadoop-user
slot.request.timeout: 120000
containerized.heap-cutoff-min: 100
END
)
docker exec master bash -c "echo \"$FLINK_CONFIG\" > /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
Expand Down Expand Up @@ -687,31 +686,4 @@ public static String escapeWithSingleQuote(String value) {
public static String escapeWithDoubleQuote(String value) {
return "\"" + WINDOWS_DOUBLE_QUOTE_ESCAPER.escape(value) + "\"";
}

/**
* Calculate heap size after cut-off. The heap size after cut-off will be used to set -Xms and -Xmx for jobmanager
* start command.
*/
public static int calculateHeapSize(int memory, Configuration conf) {

final float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
final int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);

if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
}
if (minCutoff > memory) {
throw new IllegalArgumentException("The configuration value '"
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
}

int heapLimit = (int) ((float) memory * memoryCutoffRatio);
if (heapLimit < minCutoff) {
heapLimit = minCutoff;
}
return memory - heapLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -475,58 +474,6 @@ public void testEscapeDynamicPropertyValueWithDoubleQuote() {
assertEquals("\"\\\"foo\\\" \\\"bar\\\"\"", BootstrapTools.escapeWithDoubleQuote(value3));
}

@Test
public void testHeapCutoff() {
Configuration conf = new Configuration();
conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F);
conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);

Assert.assertEquals(616, BootstrapTools.calculateHeapSize(1000, conf));
Assert.assertEquals(8500, BootstrapTools.calculateHeapSize(10000, conf));

// test different configuration
Assert.assertEquals(3400, BootstrapTools.calculateHeapSize(4000, conf));

conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), "1000");
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.1");
Assert.assertEquals(3000, BootstrapTools.calculateHeapSize(4000, conf));

conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.5");
Assert.assertEquals(2000, BootstrapTools.calculateHeapSize(4000, conf));

conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1");
Assert.assertEquals(0, BootstrapTools.calculateHeapSize(4000, conf));

// test also deprecated keys
conf = new Configuration();
conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);

Assert.assertEquals(616, BootstrapTools.calculateHeapSize(1000, conf));
Assert.assertEquals(8500, BootstrapTools.calculateHeapSize(10000, conf));
}

@Test(expected = IllegalArgumentException.class)
public void illegalArgument() {
final Configuration conf = new Configuration();
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1");
BootstrapTools.calculateHeapSize(4000, conf);
}

@Test(expected = IllegalArgumentException.class)
public void illegalArgumentNegative() {
final Configuration conf = new Configuration();
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "-0.01");
BootstrapTools.calculateHeapSize(4000, conf);
}

@Test(expected = IllegalArgumentException.class)
public void tooMuchCutoff() {
final Configuration conf = new Configuration();
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "6000");
BootstrapTools.calculateHeapSize(4000, conf);
}

@Test
public void testGetEnvironmentVariables() {
Configuration testConf = new Configuration();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.execution.ExecutionState;
Expand Down Expand Up @@ -285,9 +284,6 @@ private YarnClusterDescriptor setupYarnClusterDescriptor() {
flinkConfiguration.setString(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
flinkConfiguration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);

final int minMemory = 100;
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);

return createYarnClusterDescriptor(flinkConfiguration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
Expand Down Expand Up @@ -498,9 +497,6 @@ private void testDetachedPerJobYarnClusterInternal(String job) throws Exception
"-yt", flinkLibFolder.getAbsolutePath(),
"-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-yjm", "768m",
// test if the cutoff is passed correctly (only useful when larger than the value
// of containerized.heap-cutoff-min (default: 600MB)
"-yD", ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "=0.7",
"-yD", YarnConfigOptions.APPLICATION_TAGS.key() + "=test-tag",
"-ytm", "1024m",
"-ys", "2", // test requesting slots from YARN.
Expand Down Expand Up @@ -580,7 +576,6 @@ public boolean accept(File dir, String name) {
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
// TM was started with 1024 but we cut off 70% (NOT THE DEFAULT VALUE)
String expected = "Starting TaskManagers";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '" + jobmanagerLog + "'",
content.contains(expected));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
Expand Down Expand Up @@ -85,9 +84,6 @@ public void testFlinkContainerMemory() throws Exception {
final YarnClient yarnClient = getYarnClient();
final Configuration configuration = new Configuration(flinkConfiguration);

// disable heap cutoff min
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);

final int slotsPerTaskManager = 3;
configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
final int masterMemory = 768;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
Expand Down Expand Up @@ -101,7 +100,6 @@ public static void tearDownClass() {
@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {
final Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);

YarnClusterDescriptor clusterDescriptor = createYarnClusterDescriptor(flinkConfiguration);

Expand Down Expand Up @@ -130,7 +128,6 @@ public void testConfigOverwrite() throws ClusterDeploymentException {
Configuration configuration = new Configuration();
// overwrite vcores in config
configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);

YarnClusterDescriptor clusterDescriptor = createYarnClusterDescriptor(configuration);

Expand Down
Loading

0 comments on commit 679cd0f

Please sign in to comment.