From c0eb9d7cdecb56430831dfed505e5209f33a347c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 30 Mar 2021 14:17:41 +0200 Subject: [PATCH] [FLINK-22048] Remove akka.transport.* config options Since Flink uses TCP connections for Akka, we don't need to configure the transport failure detector and the exposed configuration options are meaningless. Consequently, this commit removes them. This closes #15433. --- .../generated/akka_configuration.html | 18 --------- .../flink/configuration/AkkaOptions.java | 31 --------------- .../apache/flink/runtime/akka/AkkaUtils.scala | 39 ++----------------- .../flink/runtime/akka/AkkaUtilsTest.scala | 10 ----- 4 files changed, 4 insertions(+), 94 deletions(-) diff --git a/docs/layouts/shortcodes/generated/akka_configuration.html b/docs/layouts/shortcodes/generated/akka_configuration.html index 2f3c5fec8afb3..8b9cd9b1cddde 100644 --- a/docs/layouts/shortcodes/generated/akka_configuration.html +++ b/docs/layouts/shortcodes/generated/akka_configuration.html @@ -128,23 +128,5 @@ Integer Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness. - -
akka.transport.heartbeat.interval
- "1000 s" - String - Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d). - - -
akka.transport.heartbeat.pause
- "6000 s" - String - Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d). - - -
akka.transport.threshold
- 300.0 - Double - Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value. - diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 116f8c110fdff..06273271c1c5b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -64,37 +64,6 @@ public class AkkaOptions { .withDescription( "Timeout after which the startup of a remote component is considered being failed."); - /** Heartbeat interval of the transport failure detector. */ - public static final ConfigOption TRANSPORT_HEARTBEAT_INTERVAL = - ConfigOptions.key("akka.transport.heartbeat.interval") - .stringType() - .defaultValue("1000 s") - .withDescription( - "Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector" - + " is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In" - + " case you should need the transport failure detector, set the interval to some reasonable value. The" - + " interval value requires a time-unit specifier (ms/s/min/h/d)."); - - /** Allowed heartbeat pause for the transport failure detector. */ - public static final ConfigOption TRANSPORT_HEARTBEAT_PAUSE = - ConfigOptions.key("akka.transport.heartbeat.pause") - .stringType() - .defaultValue("6000 s") - .withDescription( - "Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the" - + " detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value." - + " In case you should need the transport failure detector, set the pause to some reasonable value." - + " The pause value requires a time-unit specifier (ms/s/min/h/d)."); - - /** Detection threshold of transport failure detector. */ - public static final ConfigOption TRANSPORT_THRESHOLD = - ConfigOptions.key("akka.transport.threshold") - .doubleType() - .defaultValue(300.0) - .withDescription( - "Threshold for the transport failure detector. Since Flink uses TCP, the detector is not" - + " necessary and, thus, the threshold is set to a high value."); - /** Override SSL support for the Akka transport. */ public static final ConfigOption SSL_ENABLED = ConfigOptions.key("akka.ssl.enabled") diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index eb96729c3b92f..945d28c9cc880 100755 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -388,20 +388,6 @@ object AkkaUtils { ConfigFactory.parseString(config) } - private def validateHeartbeat(pauseParamName: String, - pauseValue: time.Duration, - intervalParamName: String, - intervalValue: time.Duration): Unit = { - if (pauseValue.compareTo(intervalValue) <= 0) { - throw new IllegalConfigurationException( - "%s [%s] must greater than %s [%s]", - pauseParamName, - pauseValue, - intervalParamName, - intervalValue) - } - } - /** * Creates a Akka config for a remote actor system listening on port on the network interface * identified by bindAddress. @@ -430,24 +416,6 @@ object AkkaUtils { AkkaOptions.STARTUP_TIMEOUT, TimeUtils.getStringInMillis(akkaAskTimeout.multipliedBy(10L))))) - val transportHeartbeatIntervalDuration = TimeUtils.parseDuration( - configuration.getString(AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL)) - - val transportHeartbeatPauseDuration = TimeUtils.parseDuration( - configuration.getString(AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE)) - - validateHeartbeat( - AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(), - transportHeartbeatPauseDuration, - AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.key(), - transportHeartbeatIntervalDuration) - - val transportHeartbeatInterval = TimeUtils.getStringInMillis(transportHeartbeatIntervalDuration) - - val transportHeartbeatPause = TimeUtils.getStringInMillis(transportHeartbeatPauseDuration) - - val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD) - val akkaTCPTimeout = TimeUtils.getStringInMillis( TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT))) @@ -525,10 +493,11 @@ object AkkaUtils { | remote { | startup-timeout = $startupTimeout | + | # disable the transport failure detector by setting very high values | transport-failure-detector{ - | acceptable-heartbeat-pause = $transportHeartbeatPause - | heartbeat-interval = $transportHeartbeatInterval - | threshold = $transportThreshold + | acceptable-heartbeat-pause = 6000 s + | heartbeat-interval = 1000 s + | threshold = 300 | } | | netty { diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index 0aa6a5e8d7b7f..83f1b29674b0f 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -37,16 +37,6 @@ class AkkaUtilsTest with Matchers with BeforeAndAfterAll { - test("getAkkaConfig should validate transport heartbeats") { - val configuration = new Configuration() - configuration.setString( - AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE.key(), - AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL.defaultValue()) - intercept[IllegalConfigurationException] { - AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337))) - } - } - test("getHostFromAkkaURL should return the correct host from a remote Akka URL") { val host = "127.0.0.1" val port = 1234