Skip to content

Commit

Permalink
[hotfix][core][config] Use the new type definition for config options…
Browse files Browse the repository at this point in the history
… of duration types in TaskManagerOptions.
  • Loading branch information
xintongsong authored and tillrohrmann committed Jan 3, 2020
1 parent 702a016 commit 29c718d
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 25 deletions.
16 changes: 8 additions & 8 deletions docs/_includes/generated/task_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,26 @@
</tr>
<tr>
<td><h5>taskmanager.registration.initial-backoff</h5></td>
<td style="word-wrap: break-word;">"500 ms"</td>
<td>String</td>
<td style="word-wrap: break-word;">500ms</td>
<td>Duration</td>
<td>The initial registration backoff between two consecutive registration attempts. The backoff is doubled for each new registration attempt until it reaches the maximum registration backoff.</td>
</tr>
<tr>
<td><h5>taskmanager.registration.max-backoff</h5></td>
<td style="word-wrap: break-word;">"30 s"</td>
<td>String</td>
<td style="word-wrap: break-word;">30000ms</td>
<td>Duration</td>
<td>The maximum registration backoff between two consecutive registration attempts. The max registration backoff requires a time unit specifier (ms/s/min/h/d).</td>
</tr>
<tr>
<td><h5>taskmanager.registration.refused-backoff</h5></td>
<td style="word-wrap: break-word;">"10 s"</td>
<td>String</td>
<td style="word-wrap: break-word;">10000ms</td>
<td>Duration</td>
<td>The backoff after a registration has been refused by the job manager before retrying to connect.</td>
</tr>
<tr>
<td><h5>taskmanager.registration.timeout</h5></td>
<td style="word-wrap: break-word;">"5 min"</td>
<td>String</td>
<td style="word-wrap: break-word;">300000ms</td>
<td>Duration</td>
<td>Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.annotation.docs.ConfigGroups;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.util.TimeUtils;

import java.time.Duration;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.text;
Expand Down Expand Up @@ -104,39 +107,43 @@ public class TaskManagerOptions {
* The initial registration backoff between two consecutive registration attempts. The backoff
* is doubled for each new registration attempt until it reaches the maximum registration backoff.
*/
public static final ConfigOption<String> INITIAL_REGISTRATION_BACKOFF =
public static final ConfigOption<Duration> INITIAL_REGISTRATION_BACKOFF =
key("taskmanager.registration.initial-backoff")
.defaultValue("500 ms")
.durationType()
.defaultValue(TimeUtils.parseDuration("500 ms"))
.withDeprecatedKeys("taskmanager.initial-registration-pause")
.withDescription("The initial registration backoff between two consecutive registration attempts. The backoff" +
" is doubled for each new registration attempt until it reaches the maximum registration backoff.");

/**
* The maximum registration backoff between two consecutive registration attempts.
*/
public static final ConfigOption<String> REGISTRATION_MAX_BACKOFF =
public static final ConfigOption<Duration> REGISTRATION_MAX_BACKOFF =
key("taskmanager.registration.max-backoff")
.defaultValue("30 s")
.durationType()
.defaultValue(TimeUtils.parseDuration("30 s"))
.withDeprecatedKeys("taskmanager.max-registration-pause")
.withDescription("The maximum registration backoff between two consecutive registration attempts. The max" +
" registration backoff requires a time unit specifier (ms/s/min/h/d).");

/**
* The backoff after a registration has been refused by the job manager before retrying to connect.
*/
public static final ConfigOption<String> REFUSED_REGISTRATION_BACKOFF =
public static final ConfigOption<Duration> REFUSED_REGISTRATION_BACKOFF =
key("taskmanager.registration.refused-backoff")
.defaultValue("10 s")
.durationType()
.defaultValue(TimeUtils.parseDuration("10 s"))
.withDeprecatedKeys("taskmanager.refused-registration-pause")
.withDescription("The backoff after a registration has been refused by the job manager before retrying to connect.");

/**
* Defines the timeout it can take for the TaskManager registration. If the duration is
* exceeded without a successful registration, then the TaskManager terminates.
*/
public static final ConfigOption<String> REGISTRATION_TIMEOUT =
public static final ConfigOption<Duration> REGISTRATION_TIMEOUT =
key("taskmanager.registration.timeout")
.defaultValue("5 min")
.durationType()
.defaultValue(TimeUtils.parseDuration("5 min"))
.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
.withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
" exceeded without a successful registration, then the TaskManager terminates.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -219,7 +218,7 @@ public static TaskManagerConfiguration fromConfiguration(

Time finiteRegistrationDuration;
try {
Duration maxRegistrationDuration = TimeUtils.parseDuration(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
Duration maxRegistrationDuration = configuration.get(TaskManagerOptions.REGISTRATION_TIMEOUT);
finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());
} catch (IllegalArgumentException e) {
LOG.warn("Invalid format for parameter {}. Set the timeout to be infinite.",
Expand All @@ -229,7 +228,7 @@ public static TaskManagerConfiguration fromConfiguration(

final Time initialRegistrationPause;
try {
Duration pause = TimeUtils.parseDuration(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
Duration pause = configuration.get(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF);
initialRegistrationPause = Time.milliseconds(pause.toMillis());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid format for parameter " +
Expand All @@ -238,7 +237,7 @@ public static TaskManagerConfiguration fromConfiguration(

final Time maxRegistrationPause;
try {
Duration pause = TimeUtils.parseDuration(configuration.getString(TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
Duration pause = configuration.get(TaskManagerOptions.REGISTRATION_MAX_BACKOFF);
maxRegistrationPause = Time.milliseconds(pause.toMillis());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid format for parameter " +
Expand All @@ -247,7 +246,7 @@ public static TaskManagerConfiguration fromConfiguration(

final Time refusedRegistrationPause;
try {
Duration pause = TimeUtils.parseDuration(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
Duration pause = configuration.get(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF);
refusedRegistrationPause = Time.milliseconds(pause.toMillis());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid format for parameter " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.FunctionUtils;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
Expand Down Expand Up @@ -1313,7 +1314,7 @@ public void testRemoveJobFromJobLeaderService() throws Exception {

@Test
public void testMaximumRegistrationDuration() throws Exception {
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("10 ms"));

final TaskExecutor taskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().build());

Expand All @@ -1332,7 +1333,7 @@ public void testMaximumRegistrationDuration() throws Exception {

@Test
public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "100 ms");
configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("100 ms"));
final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;

import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void after() throws Exception {
public void testShouldShutdownOnFatalError() throws Exception {
Configuration configuration = createConfiguration();
// very high timeout, to ensure that we don't fail because of registration timeouts
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "42 h");
configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("42 h"));
taskManagerRunner = createTaskManagerRunner(configuration);

taskManagerRunner.onFatalError(new RuntimeException());
Expand All @@ -77,7 +78,7 @@ public void testShouldShutdownOnFatalError() throws Exception {
@Test
public void testShouldShutdownIfRegistrationWithJobManagerFails() throws Exception {
Configuration configuration = createConfiguration();
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
configuration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("10 ms"));
taskManagerRunner = createTaskManagerRunner(configuration);

Integer statusCode = systemExitTrackingSecurityManager.getSystemExitFuture().get();
Expand Down

0 comments on commit 29c718d

Please sign in to comment.