Skip to content

Commit

Permalink
[FLINK-6469][core] Configure Memory Sizes with units
Browse files Browse the repository at this point in the history
This closes apache#5448
  • Loading branch information
yanghua authored and dawidwys committed Jul 5, 2018
1 parent abd61cf commit d02167d
Show file tree
Hide file tree
Showing 71 changed files with 579 additions and 246 deletions.
12 changes: 6 additions & 6 deletions docs/_includes/generated/common_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
</thead>
<tbody>
<tr>
<td><h5>jobmanager.heap.mb</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>JVM heap size (in megabytes) for the JobManager.</td>
<td><h5>jobmanager.heap.size</h5></td>
<td style="word-wrap: break-word;">"1024m"</td>
<td>JVM heap size for the JobManager.</td>
</tr>
<tr>
<td><h5>taskmanager.heap.mb</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
<td><h5>taskmanager.heap.size</h5></td>
<td style="word-wrap: break-word;">"1024m"</td>
<td>JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
</tr>
<tr>
<td><h5>parallelism.default</h5></td>
Expand Down
6 changes: 3 additions & 3 deletions docs/_includes/generated/job_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
<td>The maximum number of prior execution attempts kept in history.</td>
</tr>
<tr>
<td><h5>jobmanager.heap.mb</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>JVM heap size (in megabytes) for the JobManager.</td>
<td><h5>jobmanager.heap.size</h5></td>
<td style="word-wrap: break-word;">"1024m"</td>
<td>JVM heap size for the JobManager.</td>
</tr>
<tr>
<td><h5>jobmanager.resourcemanager.reconnect-interval</h5></td>
Expand Down
22 changes: 11 additions & 11 deletions docs/_includes/generated/task_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
<td>Whether the quarantine monitor for task managers shall be started. The quarantine monitor shuts down the actor system if it detects that it has quarantined another actor system or if it has been quarantined by another actor system.</td>
</tr>
<tr>
<td><h5>taskmanager.heap.mb</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
<td><h5>taskmanager.heap.size</h5></td>
<td style="word-wrap: break-word;">"1024m"</td>
<td>JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
</tr>
<tr>
<td><h5>taskmanager.host</h5></td>
Expand Down Expand Up @@ -84,13 +84,13 @@
</tr>
<tr>
<td><h5>taskmanager.memory.segment-size</h5></td>
<td style="word-wrap: break-word;">32768</td>
<td>Size of memory buffers used by the network stack and the memory manager (in bytes).</td>
<td style="word-wrap: break-word;">"32768"</td>
<td>Size of memory buffers used by the network stack and the memory manager.</td>
</tr>
<tr>
<td><h5>taskmanager.memory.size</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not set, a relative fraction will be allocated.</td>
<td style="word-wrap: break-word;">"0"</td>
<td>Amount of memory to be allocated by the task manager's memory manager. If not set, a relative fraction will be allocated.</td>
</tr>
<tr>
<td><h5>taskmanager.network.detailed-metrics</h5></td>
Expand All @@ -114,13 +114,13 @@
</tr>
<tr>
<td><h5>taskmanager.network.memory.max</h5></td>
<td style="word-wrap: break-word;">1073741824</td>
<td>Maximum memory size for network buffers (in bytes).</td>
<td style="word-wrap: break-word;">"1073741824"</td>
<td>Maximum memory size for network buffers.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.min</h5></td>
<td style="word-wrap: break-word;">67108864</td>
<td>Minimum memory size for network buffers (in bytes).</td>
<td style="word-wrap: break-word;">"67108864"</td>
<td>Minimum memory size for network buffers.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.initial</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;

/**
Expand Down Expand Up @@ -67,8 +68,8 @@ public String toString() {
public static ClusterSpecification fromConfiguration(Configuration configuration) {
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
int jobManagerMemoryMb = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
int taskManagerMemoryMb = MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();

return new ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class KafkaShortRetentionTestBase implements Serializable {

private static Configuration getConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
return flinkConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
return flinkConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static void main(String[] args) throws Exception {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");

LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void main(String[] args) throws Exception {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");

LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private abstract static class LimitNetworkBuffersTestEnvironment extends Executi
public static void setAsContext() {
Configuration config = new Configuration();
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
final LocalEnvironment le = new LocalEnvironment(config);

initializeContextEnvironment(new ExecutionEnvironmentFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void submitTopologyWithOpts(final String topologyName, final Map conf, fi
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

this.flink = new LocalFlinkMiniCluster(configuration, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,20 @@ public class JobManagerOptions {
" leader from potentially multiple standby JobManagers.");

/**
* JVM heap size (in megabytes) for the JobManager.
* JVM heap size for the JobManager with memory size.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
key("jobmanager.heap.size")
.defaultValue("1024m")
.withDescription("JVM heap size for the JobManager.");

/**
* JVM heap size (in megabytes) for the JobManager.
* @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
*/
@Deprecated
public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
key("jobmanager.heap.mb")
.defaultValue(1024)
.withDescription("JVM heap size (in megabytes) for the JobManager.");
Expand Down
Loading

0 comments on commit d02167d

Please sign in to comment.