Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-16746] Deprecate legacy heap memory option for JM and expose the new ones in docs #11787

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[FLINK-16745][k8s] Start Kubernetes JM with FLIP-116 JVM memory args
This closes #11675.
  • Loading branch information
azagrebin committed Apr 22, 2020
commit 24e929ff94d35c4a38bda3e33f49d087858b3520
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ mkdir -p "$(dirname $LOCAL_OUTPUT_PATH)"
# Set the memory and cpu smaller than default, so that the jobmanager and taskmanager pods could be allocated in minikube.
"$FLINK_DIR"/bin/kubernetes-session.sh -Dkubernetes.cluster-id=${CLUSTER_ID} \
-Dkubernetes.container.image=${FLINK_IMAGE_NAME} \
-Djobmanager.heap.size=512m \
-Dcontainerized.heap-cutoff-min=100 \
-Djobmanager.memory.process.size=1088m \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dkubernetes.taskmanager.cpu=0.5 \
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
package org.apache.flink.kubernetes.kubeclient.decorators;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemorySpec;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
Expand All @@ -44,9 +48,12 @@ public JavaCmdJobManagerDecorator(KubernetesJobManagerParameters kubernetesJobMa

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithFallbackForLegacyHeap(
kubernetesJobManagerParameters.getFlinkConfiguration(),
JobManagerOptions.TOTAL_PROCESS_MEMORY);
final String startCommand = getJobManagerStartCommand(
kubernetesJobManagerParameters.getFlinkConfiguration(),
kubernetesJobManagerParameters.getJobManagerMemoryMB(),
processSpec,
kubernetesJobManagerParameters.getFlinkConfDirInPod(),
kubernetesJobManagerParameters.getFlinkLogDirInPod(),
kubernetesJobManagerParameters.hasLogback(),
Expand All @@ -67,7 +74,7 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
* Generates the shell command to start a jobmanager for kubernetes.
*
* @param flinkConfig The Flink configuration.
* @param jobManagerMemoryMb JobManager heap size.
* @param jobManagerProcessSpec JobManager process memory spec.
* @param configDirectory The configuration directory for the flink-conf.yaml
* @param logDirectory The log directory.
* @param hasLogback Uses logback?
Expand All @@ -77,14 +84,13 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
*/
private static String getJobManagerStartCommand(
Configuration flinkConfig,
int jobManagerMemoryMb,
ProcessMemorySpec jobManagerProcessSpec,
String configDirectory,
String logDirectory,
boolean hasLogback,
boolean hasLog4j,
String mainClass) {
final int heapSize = BootstrapTools.calculateHeapSize(jobManagerMemoryMb, flinkConfig);
final String jvmMemOpts = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize);
final String jvmMemOpts = ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec);
return KubernetesUtils.getCommonStartCommand(
flinkConfig,
KubernetesUtils.ClusterComponent.JOB_MANAGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class KubernetesTestBase extends TestLogger {
protected static final String CONTAINER_IMAGE = "flink-k8s-test:latest";
protected static final KubernetesConfigOptions.ImagePullPolicy CONTAINER_IMAGE_PULL_POLICY =
KubernetesConfigOptions.ImagePullPolicy.IfNotPresent;
protected static final int JOB_MANAGER_MEMORY = 768;

@Rule
public MixedKubernetesServer server = new MixedKubernetesServer(true, true);
Expand All @@ -74,6 +77,7 @@ public void setup() throws Exception {
flinkConfig.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
flinkConfig.setString(KubernetesConfigOptions.CONTAINER_IMAGE, CONTAINER_IMAGE);
flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, CONTAINER_IMAGE_PULL_POLICY);
flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));

flinkConfDir = temporaryFolder.newFolder().getAbsoluteFile();
hadoopConfDir = temporaryFolder.newFolder().getAbsoluteFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testCorrectSettingOfMaxSlots() throws Exception {
"-e", KubernetesSessionClusterExecutor.NAME,
"-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() + "=3"};

final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1234);
final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1234);

final Configuration executorConfig = cli.getEffectiveConfiguration(params);
final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
Expand All @@ -96,7 +96,7 @@ public void testCorrectSettingOfMaxSlots() throws Exception {

@Test
public void testResumeFromKubernetesID() throws Exception {
final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1024);
final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024);

final String clusterId = "my-test-CLUSTER_ID";
final String[] args = new String[] {
Expand All @@ -120,13 +120,13 @@ public void testCommandLineClusterSpecification() throws Exception {
final int taskManagerMemory = 7331;
final int slotsPerTaskManager = 30;

configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory));
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory));
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);

final String[] args = {
"-e", KubernetesSessionClusterExecutor.NAME,
"-D" + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=" + jobManagerMemory + "m",
"-D" + JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=" + jobManagerMemory + "m",
"-D" + TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=" + taskManagerMemory + "m",
"-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() + "=" + slotsPerTaskManager
};
Expand All @@ -150,7 +150,7 @@ public void testCommandLineClusterSpecification() throws Exception {
public void testConfigurationClusterSpecification() throws Exception {
final Configuration configuration = new Configuration();
final int jobManagerMemory = 1337;
configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory));
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory));
final int taskManagerMemory = 7331;
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory));
final int slotsPerTaskManager = 42;
Expand All @@ -175,11 +175,11 @@ public void testConfigurationClusterSpecification() throws Exception {
public void testHeapMemoryPropertyWithUnitMB() throws Exception {
final String[] args = new String[] {
"-e", KubernetesSessionClusterExecutor.NAME,
"-D" + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=1024m",
"-D" + JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=1024m",
"-D" + TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=2048m"
};

final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1024);
final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024);

final Configuration executorConfig = cli.getEffectiveConfiguration(args);
final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
Expand All @@ -196,11 +196,11 @@ public void testHeapMemoryPropertyWithUnitMB() throws Exception {
public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
final String[] args = new String[] {
"-e", KubernetesSessionClusterExecutor.NAME,
"-D" + JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key() + "=1g",
"-D" + JobManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=1g",
"-D" + TaskManagerOptions.TOTAL_PROCESS_MEMORY.key() + "=3g"
};

final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1024);
final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024);

final Configuration executorConfig = cli.getEffectiveConfiguration(args);
final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
Expand Down Expand Up @@ -239,7 +239,7 @@ public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
"-e", KubernetesSessionClusterExecutor.NAME
};

final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithTmTotalMemory(1024);
final KubernetesSessionCli cli = createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(1024);

final Configuration executorConfig = cli.getEffectiveConfiguration(args);
final ClusterClientFactory<String> clientFactory = getClusterClientFactory(executorConfig);
Expand All @@ -254,8 +254,9 @@ private ClusterClientFactory<String> getClusterClientFactory(final Configuration
return clusterClientServiceLoader.getClusterClientFactory(executorConfig);
}

private KubernetesSessionCli createFlinkKubernetesCustomCliWithTmTotalMemory(int totalMemory) {
private KubernetesSessionCli createFlinkKubernetesCustomCliWithJmAndTmTotalMemory(int totalMemory) {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemory));
return new KubernetesSessionCli(configuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.KubernetesTestBase;
Expand Down Expand Up @@ -83,6 +84,7 @@ public void setup() throws Exception {
this.flinkConfig.setString(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX + k, v));
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_LABELS, userLabels);
this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_NODE_SELECTOR, nodeSelector);
this.flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));

final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(JOB_MANAGER_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;

import io.fabric8.kubernetes.api.model.Container;
import org.junit.Before;
Expand Down Expand Up @@ -63,8 +65,8 @@ public class JavaCmdJobManagerDecoratorTest extends KubernetesJobManagerTestBase
FLINK_LOG_DIR_IN_POD, FLINK_LOG_DIR_IN_POD);

// Memory variables
private static final String jmJvmMem = String.format("-Xms%dm -Xmx%dm",
JOB_MANAGER_MEMORY - 600, JOB_MANAGER_MEMORY - 600);
private static final String jmJvmMem = ProcessMemoryUtils.generateJvmParametersStr(
JobManagerProcessUtils.createDefaultJobManagerProcessSpec(JOB_MANAGER_MEMORY));

private JavaCmdJobManagerDecorator javaCmdJobManagerDecorator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
*/
public class KubernetesJobManagerParametersTest extends KubernetesTestBase {

private static final int JOB_MANAGER_MEMORY = 768;
private static final double JOB_MANAGER_CPU = 2.0;

private final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
Expand Down