forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TaskManagerOptions.java
406 lines (362 loc) · 18.8 KB
/
TaskManagerOptions.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* The set of configuration options relating to TaskManager and Task settings.
*/
@PublicEvolving
public class TaskManagerOptions {
// ------------------------------------------------------------------------
// General TaskManager Options
// ------------------------------------------------------------------------
/**
* JVM heap size for the TaskManagers with memory size.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
key("taskmanager.heap.size")
.defaultValue("1024m")
.withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
" the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
" YARN container, minus a certain tolerance value.");
/**
* JVM heap size (in megabytes) for the TaskManagers.
*
* @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
*/
@Deprecated
public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY_MB =
key("taskmanager.heap.mb")
.defaultValue(1024)
.withDescription("JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of" +
" the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
" YARN container, minus a certain tolerance value.");
/**
* Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
*/
public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
key("taskmanager.jvm-exit-on-oom")
.defaultValue(false)
.withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.");
/**
* Whether the quarantine monitor for task managers shall be started. The quarantine monitor
* shuts down the actor system if it detects that it has quarantined another actor system
* or if it has been quarantined by another actor system.
*/
public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
key("taskmanager.exit-on-fatal-akka-error")
.defaultValue(false)
.withDescription("Whether the quarantine monitor for task managers shall be started. The quarantine monitor" +
" shuts down the actor system if it detects that it has quarantined another actor system" +
" or if it has been quarantined by another actor system.");
/**
* The config parameter defining the task manager's hostname.
*/
public static final ConfigOption<String> HOST =
key("taskmanager.host")
.noDefaultValue()
.withDescription("The hostname of the network interface that the TaskManager binds to. By default, the" +
" TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers." +
" This option can be used to define a hostname if that strategy fails for some reason. Because" +
" different TaskManagers need different values for this option, it usually is specified in an" +
" additional non-shared TaskManager-specific config file.");
/**
* The default network port range the task manager expects incoming IPC connections. The {@code "0"} means that
* the TaskManager searches for a free port.
*/
public static final ConfigOption<String> RPC_PORT =
key("taskmanager.rpc.port")
.defaultValue("0")
.withDescription("The task manager’s IPC port. Accepts a list of ports (“50100,50101”), ranges" +
" (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid" +
" collisions when multiple TaskManagers are running on the same machine.");
/**
* The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
* the TaskManager searches for a free port.
*/
public static final ConfigOption<Integer> DATA_PORT =
key("taskmanager.data.port")
.defaultValue(0)
.withDescription("The task manager’s port used for data exchange operations.");
/**
* Config parameter to override SSL support for taskmanager's data transport.
*/
public static final ConfigOption<Boolean> DATA_SSL_ENABLED =
key("taskmanager.data.ssl.enabled")
.defaultValue(true)
.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
" global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
/**
* The initial registration backoff between two consecutive registration attempts. The backoff
* is doubled for each new registration attempt until it reaches the maximum registration backoff.
*/
public static final ConfigOption<String> INITIAL_REGISTRATION_BACKOFF =
key("taskmanager.registration.initial-backoff")
.defaultValue("500 ms")
.withDeprecatedKeys("taskmanager.initial-registration-pause")
.withDescription("The initial registration backoff between two consecutive registration attempts. The backoff" +
" is doubled for each new registration attempt until it reaches the maximum registration backoff.");
/**
* The maximum registration backoff between two consecutive registration attempts.
*/
public static final ConfigOption<String> REGISTRATION_MAX_BACKOFF =
key("taskmanager.registration.max-backoff")
.defaultValue("30 s")
.withDeprecatedKeys("taskmanager.max-registration-pause")
.withDescription("The maximum registration backoff between two consecutive registration attempts. The max" +
" registration backoff requires a time unit specifier (ms/s/min/h/d).");
/**
* The backoff after a registration has been refused by the job manager before retrying to connect.
*/
public static final ConfigOption<String> REFUSED_REGISTRATION_BACKOFF =
key("taskmanager.registration.refused-backoff")
.defaultValue("10 s")
.withDeprecatedKeys("taskmanager.refused-registration-pause")
.withDescription("The backoff after a registration has been refused by the job manager before retrying to connect.");
/**
* Defines the timeout it can take for the TaskManager registration. If the duration is
* exceeded without a successful registration, then the TaskManager terminates.
*/
public static final ConfigOption<String> REGISTRATION_TIMEOUT =
key("taskmanager.registration.timeout")
.defaultValue("5 min")
.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
.withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
" exceeded without a successful registration, then the TaskManager terminates.");
/**
* The config parameter defining the number of task slots of a task manager.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_PARALLELISM_SLOTS)
public static final ConfigOption<Integer> NUM_TASK_SLOTS =
key("taskmanager.numberOfTaskSlots")
.defaultValue(1)
.withDescription("The number of parallel operator or user function instances that a single TaskManager can" +
" run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or" +
" operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the" +
" available memory is divided between the different operator or function instances. This value" +
" is typically proportional to the number of physical CPU cores that the TaskManager's machine has" +
" (e.g., equal to the number of cores, or half the number of cores).");
public static final ConfigOption<Boolean> DEBUG_MEMORY_LOG =
key("taskmanager.debug.memory.log")
.defaultValue(false)
.withDeprecatedKeys("taskmanager.debug.memory.startLogThread")
.withDescription("Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.");
public static final ConfigOption<Long> DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS =
key("taskmanager.debug.memory.log-interval")
.defaultValue(5000L)
.withDeprecatedKeys("taskmanager.debug.memory.logIntervalMs")
.withDescription("The interval (in ms) for the log thread to log the current memory usage.");
// ------------------------------------------------------------------------
// Managed Memory Options
// ------------------------------------------------------------------------
/**
* Size of memory buffers used by the network stack and the memory manager.
*/
public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
.defaultValue("32kb")
.withDescription("Size of memory buffers used by the network stack and the memory manager.");
/**
* Amount of memory to be allocated by the task manager's memory manager. If not
* set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
*/
public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
key("taskmanager.memory.size")
.defaultValue("0")
.withDescription("Amount of memory to be allocated by the task manager's memory manager." +
" If not set, a relative fraction will be allocated.");
/**
* Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
* not set.
*/
public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
key("taskmanager.memory.fraction")
.defaultValue(0.7f)
.withDescription("The relative amount of memory (after subtracting the amount of memory used by network" +
" buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." +
" For example, a value of `0.8` means that a task manager reserves 80% of its memory" +
" for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" +
" created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
" is not set.");
/**
* Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
* as well as the network buffers.
**/
public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
key("taskmanager.memory.off-heap")
.defaultValue(false)
.withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" +
" TaskManager as well as the network buffers.");
/**
* Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
*/
public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
key("taskmanager.memory.preallocate")
.defaultValue(false)
.withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.");
// ------------------------------------------------------------------------
// Network Options
// ------------------------------------------------------------------------
/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*
* @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
* and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);
/**
* Fraction of JVM memory to use for network buffers.
*/
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction")
.defaultValue(0.1f)
.withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
" data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
" are. If a job is rejected or you get a warning that the system has not enough buffers available," +
" increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
"` and \"taskmanager.network.memory.max\" may override this fraction.");
/**
* Minimum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min")
.defaultValue("64mb")
.withDescription("Minimum memory size for network buffers.");
/**
* Maximum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max")
.defaultValue("1gb")
.withDescription("Maximum memory size for network buffers.");
/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
*
* <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
*/
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
" for parallel serialization.");
/**
* Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
*/
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8)
.withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
" In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
" The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
" help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
" increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
/**
* Minimum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.network.request-backoff.initial")
.defaultValue(100)
.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
.withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
/**
* Maximum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.network.request-backoff.max")
.defaultValue(10000)
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
/**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
* lengths.
*/
public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
key("taskmanager.network.detailed-metrics")
.defaultValue(false)
.withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.");
/**
* Boolean flag to enable/disable network credit-based flow control.
*
* @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of
* credit-based flow control.
*/
@Deprecated
public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
key("taskmanager.network.credit-model")
.defaultValue(true)
.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
.withDescription("Boolean flag to enable/disable network credit-based flow control.");
// ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
/**
* Time interval in milliseconds between two successive task cancellation
* attempts.
*/
public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL =
key("task.cancellation.interval")
.defaultValue(30000L)
.withDeprecatedKeys("task.cancellation-interval")
.withDescription("Time interval between two successive task cancellation attempts in milliseconds.");
/**
* Timeout in milliseconds after which a task cancellation times out and
* leads to a fatal TaskManager error. A value of <code>0</code> deactivates
* the watch dog.
*/
public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT =
key("task.cancellation.timeout")
.defaultValue(180000L)
.withDescription("Timeout in milliseconds after which a task cancellation times out and" +
" leads to a fatal TaskManager error. A value of 0 deactivates" +
" the watch dog.");
/**
* This configures how long we wait for the timers in milliseconds to finish all pending timer threads
* when the stream task is cancelled.
*/
public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT_TIMERS = ConfigOptions
.key("task.cancellation.timers.timeout")
.defaultValue(7500L)
.withDeprecatedKeys("timerservice.exceptional.shutdown.timeout")
.withDescription("Time we wait for the timers in milliseconds to finish all pending timer threads" +
" when the stream task is cancelled.");
/**
* The maximum number of bytes that a checkpoint alignment may buffer.
* If the checkpoint alignment buffers more than the configured amount of
* data, the checkpoint is aborted (skipped).
*
* <p>The default value of {@code -1} indicates that there is no limit.
*/
public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
key("task.checkpoint.alignment.max-size")
.defaultValue(-1L)
.withDescription("The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint" +
" alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped)." +
" A value of -1 indicates that there is no limit.");
// ------------------------------------------------------------------------
/** Not intended to be instantiated. */
private TaskManagerOptions() {}
}