Skip to content

Commit

Permalink
[FLINK-22048] Remove akka.transport.* config options
Browse files Browse the repository at this point in the history
Since Flink uses TCP connections for Akka, we don't need to configure the
transport failure detector and the exposed configuration options are meaningless.
Consequently, this commit removes them.

This closes apache#15433.
  • Loading branch information
tillrohrmann committed Apr 1, 2021
1 parent c36bce8 commit c0eb9d7
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 94 deletions.
18 changes: 0 additions & 18 deletions docs/layouts/shortcodes/generated/akka_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -128,23 +128,5 @@
<td>Integer</td>
<td>Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness.</td>
</tr>
<tr>
<td><h5>akka.transport.heartbeat.interval</h5></td>
<td style="word-wrap: break-word;">"1000 s"</td>
<td>String</td>
<td>Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d).</td>
</tr>
<tr>
<td><h5>akka.transport.heartbeat.pause</h5></td>
<td style="word-wrap: break-word;">"6000 s"</td>
<td>String</td>
<td>Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d).</td>
</tr>
<tr>
<td><h5>akka.transport.threshold</h5></td>
<td style="word-wrap: break-word;">300.0</td>
<td>Double</td>
<td>Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,6 @@ public class AkkaOptions {
.withDescription(
"Timeout after which the startup of a remote component is considered being failed.");

/** Heartbeat interval of the transport failure detector. */
public static final ConfigOption<String> TRANSPORT_HEARTBEAT_INTERVAL =
ConfigOptions.key("akka.transport.heartbeat.interval")
.stringType()
.defaultValue("1000 s")
.withDescription(
"Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector"
+ " is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In"
+ " case you should need the transport failure detector, set the interval to some reasonable value. The"
+ " interval value requires a time-unit specifier (ms/s/min/h/d).");

/** Allowed heartbeat pause for the transport failure detector. */
public static final ConfigOption<String> TRANSPORT_HEARTBEAT_PAUSE =
ConfigOptions.key("akka.transport.heartbeat.pause")
.stringType()
.defaultValue("6000 s")
.withDescription(
"Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the"
+ " detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value."
+ " In case you should need the transport failure detector, set the pause to some reasonable value."
+ " The pause value requires a time-unit specifier (ms/s/min/h/d).");

/** Detection threshold of transport failure detector. */
public static final ConfigOption<Double> TRANSPORT_THRESHOLD =
ConfigOptions.key("akka.transport.threshold")
.doubleType()
.defaultValue(300.0)
.withDescription(
"Threshold for the transport failure detector. Since Flink uses TCP, the detector is not"
+ " necessary and, thus, the threshold is set to a high value.");

/** Override SSL support for the Akka transport. */
public static final ConfigOption<Boolean> SSL_ENABLED =
ConfigOptions.key("akka.ssl.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,20 +388,6 @@ object AkkaUtils {
ConfigFactory.parseString(config)
}

private def validateHeartbeat(pauseParamName: String,
pauseValue: time.Duration,
intervalParamName: String,
intervalValue: time.Duration): Unit = {
if (pauseValue.compareTo(intervalValue) <= 0) {
throw new IllegalConfigurationException(
"%s [%s] must greater than %s [%s]",
pauseParamName,
pauseValue,
intervalParamName,
intervalValue)
}
}

/**
* Creates a Akka config for a remote actor system listening on port on the network interface
* identified by bindAddress.
Expand Down Expand Up @@ -430,24 +416,6 @@ object AkkaUtils {
AkkaOptions.STARTUP_TIMEOUT,
TimeUtils.getStringInMillis(akkaAskTimeout.multipliedBy(10L)))))

val transportHeartbeatIntervalDuration = TimeUtils.parseDuration(
configuration.getString(AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL))

val transportHeartbeatPauseDuration = TimeUtils.parseDuration(
configuration.getString(AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE))

validateHeartbeat(
AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(),
transportHeartbeatPauseDuration,
AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.key(),
transportHeartbeatIntervalDuration)

val transportHeartbeatInterval = TimeUtils.getStringInMillis(transportHeartbeatIntervalDuration)

val transportHeartbeatPause = TimeUtils.getStringInMillis(transportHeartbeatPauseDuration)

val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD)

val akkaTCPTimeout = TimeUtils.getStringInMillis(
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)))

Expand Down Expand Up @@ -525,10 +493,11 @@ object AkkaUtils {
| remote {
| startup-timeout = $startupTimeout
|
| # disable the transport failure detector by setting very high values
| transport-failure-detector{
| acceptable-heartbeat-pause = $transportHeartbeatPause
| heartbeat-interval = $transportHeartbeatInterval
| threshold = $transportThreshold
| acceptable-heartbeat-pause = 6000 s
| heartbeat-interval = 1000 s
| threshold = 300
| }
|
| netty {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ class AkkaUtilsTest
with Matchers
with BeforeAndAfterAll {

test("getAkkaConfig should validate transport heartbeats") {
val configuration = new Configuration()
configuration.setString(
AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(),
AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.defaultValue())
intercept[IllegalConfigurationException] {
AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337)))
}
}

test("getHostFromAkkaURL should return the correct host from a remote Akka URL") {
val host = "127.0.0.1"
val port = 1234
Expand Down

0 comments on commit c0eb9d7

Please sign in to comment.