diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index 39b1f7165a0c2..eb9bc83f094ed 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -36,6 +36,14 @@ jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 +# The host interface the JobManager will bind to. My default, this is localhost, and will prevent +# the JobManager from communicating outside the machine/container it is running on. +# +# To enable this, set the bind-host address to one that has access to an outside facing network +# interface, such as 0.0.0.0. + +jobmanager.bind-host: localhost + # The total process memory size for the JobManager. # @@ -43,6 +51,13 @@ jobmanager.rpc.port: 6123 jobmanager.memory.process.size: 1600m +# The host interface the TaskManager will bind to. My default, this is localhost, and will prevent +# the TaskManager from communicating outside the machine/container it is running on. +# +# To enable this, set the bind-host address to one that has access to an outside facing network +# interface, such as 0.0.0.0. + +taskmanager.bind-host: localhost # The total process memory size for the TaskManager. # diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java index c0e7b415ec0ba..bcd18cb5b9ed9 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersBuilder.java @@ -165,6 +165,9 @@ public FlinkContainers build() { CHECKPOINT_PATH.toAbsolutePath().toUri().toString()); this.conf.set(RestOptions.BIND_ADDRESS, "0.0.0.0"); + this.conf.set(JobManagerOptions.BIND_HOST, "0.0.0.0"); + this.conf.set(TaskManagerOptions.BIND_HOST, "0.0.0.0"); + // Create temporary directory for building Flink image final Path imageBuildingTempDir; try { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java index 61587cbf52a64..e1edc10a3b5a1 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -21,7 +21,9 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; @@ -158,6 +160,8 @@ private Map getClusterSidePropertiesMap(Configuration flinkConfi clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); + clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); return clusterSideConfig.toMap(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java index 9f61caedbe072..84849c97b362d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java @@ -58,6 +58,7 @@ public class ConnectionUtils { * state failed to determine the address. */ private enum AddressDetectionState { + LOOPBACK(100), /** Connect from interface returned by InetAddress.getLocalHost(). * */ LOCAL_HOST(200), /** Detect own IP address based on the target IP address. Look for common prefix */ @@ -115,6 +116,7 @@ public static InetAddress findConnectingAddress( final List strategies = Collections.unmodifiableList( Arrays.asList( + AddressDetectionState.LOOPBACK, AddressDetectionState.LOCAL_HOST, AddressDetectionState.ADDRESS, AddressDetectionState.FAST_CONNECT, @@ -225,6 +227,18 @@ private static InetAddress tryLocalHostBeforeReturning( private static InetAddress findAddressUsingStrategy( AddressDetectionState strategy, InetSocketAddress targetAddress, boolean logging) throws IOException { + if (strategy == AddressDetectionState.LOOPBACK) { + InetAddress loopback = InetAddress.getLoopbackAddress(); + + if (tryToConnect(loopback, targetAddress, strategy.getTimeout(), logging)) { + LOG.debug( + "Using InetAddress.getLoopbackAddress() immediately for connecting address"); + return loopback; + } else { + return null; + } + } + // try LOCAL_HOST strategy independent of the network interfaces if (strategy == AddressDetectionState.LOCAL_HOST) { InetAddress localhostName; @@ -432,7 +446,7 @@ public InetAddress findConnectingAddress(Duration timeout, Duration startLogging } if (targetAddress != null) { - AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST; + AddressDetectionState strategy = AddressDetectionState.LOOPBACK; boolean logging = elapsedTimeMillis >= startLoggingAfter.toMillis(); if (logging) { @@ -449,6 +463,9 @@ public InetAddress findConnectingAddress(Duration timeout, Duration startLogging // pick the next strategy switch (strategy) { + case LOOPBACK: + strategy = AddressDetectionState.LOCAL_HOST; + break; case LOCAL_HOST: strategy = AddressDetectionState.ADDRESS; break; diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java index 2321355a390bc..ec9cde39eb2f1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.util.Preconditions; @@ -64,6 +65,8 @@ public static Configuration loadConfiguration( ApplicationConstants.Environment.NM_HOST.key()); configuration.setString(JobManagerOptions.ADDRESS, hostname); + configuration.removeConfig(JobManagerOptions.BIND_HOST); + configuration.removeConfig(TaskManagerOptions.BIND_HOST); configuration.setString(RestOptions.ADDRESS, hostname); configuration.setString(RestOptions.BIND_ADDRESS, hostname);