Skip to content

Commit

Permalink
[FLINK-16745][clients] Parse JobManagerProcessSpec from Configuration…
Browse files Browse the repository at this point in the history
… into ClusterSpecification
  • Loading branch information
azagrebin committed Apr 27, 2020
1 parent 0f5132d commit 113f43b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -36,8 +37,10 @@ public abstract class AbstractContainerizedClusterClientFactory<ClusterID> imple
public ClusterSpecification getClusterSpecification(Configuration configuration) {
checkNotNull(configuration);

final int jobManagerMemoryMB = ConfigurationUtils
.getJobManagerHeapMemory(configuration)
final int jobManagerMemoryMB = JobManagerProcessUtils.processSpecFromConfigWithFallbackForLegacyHeap(
configuration,
JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();

final int taskManagerMemoryMB = TaskExecutorProcessUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,6 @@ public class ConfigurationUtils {

private static final String[] EMPTY = new String[0];

/**
* Get job manager's heap memory. This method will check the new key
* {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
* the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
*
* @param configuration the configuration object
* @return the memory size of job manager's heap memory.
*/
public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
return configuration.get(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
} else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
return MemorySize.ofMebiBytes(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB));
} else {
//use default value
return JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue();
}
}

/**
* @return extracted {@link MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code Optional.empty()} if
* {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package org.apache.flink.runtime.jobmanager;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
import org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions;
import org.apache.flink.runtime.util.config.memory.MemoryBackwardsCompatibilityUtils;
import org.apache.flink.runtime.util.config.memory.LegacyMemoryOptions;
import org.apache.flink.runtime.util.config.memory.MemoryBackwardsCompatibilityUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.runtime.util.config.memory.jobmanager.JobManagerFlinkMemory;
Expand Down Expand Up @@ -64,7 +65,14 @@ public class JobManagerProcessUtils {
private JobManagerProcessUtils() {
}

public static JobManagerProcessSpec processSpecFromConfig(Configuration config) {
public static JobManagerProcessSpec processSpecFromConfigWithFallbackForLegacyHeap(
Configuration config,
ConfigOption<MemorySize> newFallbackOptionForLegacyHeap) {
return processSpecFromConfig(
getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(config, newFallbackOptionForLegacyHeap));
}

static JobManagerProcessSpec processSpecFromConfig(Configuration config) {
return createMemoryProcessSpec(PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
}

Expand All @@ -73,9 +81,16 @@ private static JobManagerProcessSpec createMemoryProcessSpec(
return new JobManagerProcessSpec(processMemory.getFlinkMemory(), processMemory.getJvmMetaspaceAndOverhead());
}

public static Configuration getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
static Configuration getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
Configuration configuration,
ConfigOption<MemorySize> configOption) {
return LEGACY_MEMORY_UTILS.getConfWithLegacyHeapSizeMappedToNewConfigOption(configuration, configOption);
}

@VisibleForTesting
public static JobManagerProcessSpec createDefaultJobManagerProcessSpec(int totalProcessMemoryMb) {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalProcessMemoryMb));
return processSpecFromConfig(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,12 @@ private static Configuration getConfigurationForStandaloneTaskManagers(String[]
}

private static void getJmResourceParams(String[] args) throws Exception {
Configuration configuration = getConfigurationForStandaloneJobManager(args);
JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfig(configuration);
JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfigWithFallbackForLegacyHeap(
FlinkConfigLoader.loadConfiguration(args),
JobManagerOptions.TOTAL_FLINK_MEMORY);
System.out.println(EXECUTION_PREFIX + ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec));
}

private static Configuration getConfigurationForStandaloneJobManager(String[] args) throws Exception {
Configuration configuration = FlinkConfigLoader.loadConfiguration(args);
return JobManagerProcessUtils.getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
configuration, JobManagerOptions.TOTAL_FLINK_MEMORY);
}

/**
* Commands that BashJavaUtils supports.
*/
Expand Down

0 comments on commit 113f43b

Please sign in to comment.