Skip to content

Commit

Permalink
[FLINK-2821] use custom Akka build to listen on all interfaces
Browse files Browse the repository at this point in the history
This uses Flakka (a custom Akka 2.3 build) to resolve the issue that
the bind address needs to be matching the external address of the
JobManager. With the changes applied, we can now bind to all
interfaces, e.g. via 0.0.0.0 (IPv4) or :: (IPv6).

For this to work properly, the configuration entry
JOB_MANAGER_IPC_ADDRESS now represents the external address of the
JobManager. Consequently, it should not be resolved to an IP address
anymore because it may not be resolvable from within containered
environments. Akka treats this address as the logical address. Any
messages which are not tagged with this address will be received by
the Actor System (because we listen on all interfaces) but will be
dropped subsequently. In addition, we need the external address for
the JobManager to be able to publish it to Zookeeper for HA setups.

Flakka: https://github.com/mxm/flakka
Patch applied: akka/akka#15610

- convert host to lower case
- use consistent format for IPv6 address
- adapt config and test cases
- adapt documentation to clarify the address config entry
- TaskManager: resolve the initial hostname of the StandaloneLeaderRetrievalService

This closes apache#2917.
  • Loading branch information
mxm committed Dec 16, 2016
1 parent e9e6688 commit 27ebdf7
Show file tree
Hide file tree
Showing 35 changed files with 390 additions and 221 deletions.
4 changes: 2 additions & 2 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The configuration files for the TaskManagers can be different, Flink does not as

- `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.

- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost).
- `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). **Note:** The address (host name or IP) should be accessible by all nodes including the client.

- `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123).

Expand Down Expand Up @@ -206,7 +206,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp

The following parameters configure Flink's JobManager and TaskManagers.

- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: **localhost**).
- `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: **localhost**). **Note:** The address (host name or IP) should be accessible by all nodes including the client.

- `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: **6123**).

Expand Down
4 changes: 2 additions & 2 deletions flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ under the License.
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
<groupId>com.data-artisans</groupId>
<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws

final LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
} catch (Exception e) {
throw new ProgramInvocationException("Could not create the leader retrieval service", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.junit.BeforeClass;
import org.junit.Test;

import java.net.InetAddress;
Expand All @@ -39,15 +40,17 @@ public class RemoteExecutorHostnameResolutionTest {

private static final String nonExistingHostname = "foo.bar.com.invalid";
private static final int port = 14451;



@BeforeClass
public static void check() {
checkPreconditions();
}

@Test
public void testUnresolvableHostname1() {

checkPreconditions();


RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
try {
RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
Expand All @@ -65,12 +68,10 @@ public void testUnresolvableHostname1() {
@Test
public void testUnresolvableHostname2() {

checkPreconditions();

try {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
Collections.<URL>emptyList(), Collections.<URL>emptyList());
try {
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,58 +22,63 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.TestLogger;
import org.junit.BeforeClass;
import org.junit.Test;

import java.net.InetAddress;
import java.net.UnknownHostException;

import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.junit.Assume.assumeTrue;

/**
* Tests that verify that the LeaderRetrievalSevice correctly handles non-resolvable host names
* and does not fail with another exception
*/
public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {

private static final String nonExistingHostname = "foo.bar.com.invalid";


@BeforeClass
public static void check() {
checkPreconditions();
}

/*
* Tests that the StandaloneLeaderRetrievalService resolves host names if specified.
*/
@Test
public void testUnresolvableHostname1() {

checkPreconditions();


try {
Configuration config = new Configuration();

config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);

LeaderRetrievalUtils.createLeaderRetrievalService(config);
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
catch (Exception e) {
System.err.println("Wrong exception!");
System.err.println("Shouldn't throw an exception!");
e.printStackTrace();
fail(e.getMessage());
}
}

/*
* Tests that the StandaloneLeaderRetrievalService does not resolve host names by default.
*/
@Test
public void testUnresolvableHostname2() {

checkPreconditions();

try {
Configuration config = new Configuration();

config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);

LeaderRetrievalUtils.createLeaderRetrievalService(config);
fail("This should fail with an UnknownHostException");
LeaderRetrievalUtils.createLeaderRetrievalService(config, true);
fail("This should fail with an IllegalConfigurationException");
}
catch (UnknownHostException e) {
// that is what we want!
Expand All @@ -84,7 +89,7 @@ public void testUnresolvableHostname2() {
fail(e.getMessage());
}
}

private static void checkPreconditions() {
// the test can only work if the invalid URL cannot be resolves
// some internet providers resolve unresolvable URLs to navigational aid servers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;

import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,7 +64,6 @@

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
Expand Down Expand Up @@ -333,7 +333,7 @@ private ActorRef getJobManager() throws IOException {
}

return JobManager.getJobManagerActorRef(AkkaUtils.getAkkaProtocol(configuration),
new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
NetUtils.unresolvedHostAndPortToNormalizedString(this.jobManagerHost, this.jobManagerPort),
actorSystem, AkkaUtils.getLookupTimeout(configuration));
}

Expand Down
79 changes: 75 additions & 4 deletions flink-core/src/main/java/org/apache/flink/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.flink.annotation.Internal;

import org.apache.flink.configuration.IllegalConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.net.util.IPAddressUtil;

import java.io.IOException;
import java.net.Inet4Address;
Expand All @@ -40,6 +42,9 @@
public class NetUtils {

private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);

/** The wildcard address to listen on all interfaces (either 0.0.0.0 or ::) */
private static final String WILDCARD_ADDRESS = new InetSocketAddress(0).getAddress().getHostAddress();

/**
* Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts
Expand Down Expand Up @@ -111,7 +116,55 @@ public static int getAvailablePort() {
// ------------------------------------------------------------------------
// Encoding of IP addresses for URLs
// ------------------------------------------------------------------------


/**
* Returns an address in a normalized format for Akka.
* When an IPv6 address is specified, it normalizes the IPv6 address to avoid
* complications with the exact URL match policy of Akka.
* @param host The hostname, IPv4 or IPv6 address
* @return host which will be normalized if it is an IPv6 address
*/
public static String unresolvedHostToNormalizedString(String host) {
// Return loopback interface address if host is null
// This represents the behavior of {@code InetAddress.getByName } and RFC 3330
if (host == null) {
host = InetAddress.getLoopbackAddress().getHostAddress();
} else {
host = host.trim().toLowerCase();
}

// normalize and valid address
if (IPAddressUtil.isIPv6LiteralAddress(host)) {
byte[] ipV6Address = IPAddressUtil.textToNumericFormatV6(host);
host = getIPv6UrlRepresentation(ipV6Address);
} else if (!IPAddressUtil.isIPv4LiteralAddress(host)) {
try {
// We don't allow these in hostnames
Preconditions.checkArgument(!host.startsWith("."));
Preconditions.checkArgument(!host.endsWith("."));
Preconditions.checkArgument(!host.contains(":"));
} catch (Exception e) {
throw new IllegalConfigurationException("The configured hostname is not valid", e);
}
}

return host;
}

/**
* Returns a valid address for Akka. It returns a String of format 'host:port'.
* When an IPv6 address is specified, it normalizes the IPv6 address to avoid
* complications with the exact URL match policy of Akka.
* @param host The hostname, IPv4 or IPv6 address
* @param port The port
* @return host:port where host will be normalized if it is an IPv6 address
*/
public static String unresolvedHostAndPortToNormalizedString(String host, int port) {
Preconditions.checkArgument(port >= 0 && port < 65536,
"Port is not within the valid range,");
return unresolvedHostToNormalizedString(host) + ":" + port;
}

/**
* Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
* have the proper formatting to be included in URLs.
Expand All @@ -137,7 +190,7 @@ else if (address instanceof Inet6Address) {
/**
* Encodes an IP address and port to be included in URL. in particular, this method makes
* sure that IPv6 addresses have the proper formatting to be included in URLs.
*
*
* @param address The address to be included in the URL.
* @param port The port for the URL address.
* @return The proper URL string encoded IP address and port.
Expand Down Expand Up @@ -176,14 +229,24 @@ public static String hostAndPortToUrlString(String host, int port) throws Unknow

/**
* Creates a compressed URL style representation of an Inet6Address.
*
*
* <p>This method copies and adopts code from Google's Guava library.
* We re-implement this here in order to reduce dependency on Guava.
* The Guava library has frequently caused dependency conflicts in the past.
*/
private static String getIPv6UrlRepresentation(Inet6Address address) {
return getIPv6UrlRepresentation(address.getAddress());
}

/**
* Creates a compressed URL style representation of an Inet6Address.
*
* <p>This method copies and adopts code from Google's Guava library.
* We re-implement this here in order to reduce dependency on Guava.
* The Guava library has frequently caused dependency conflicts in the past.
*/
private static String getIPv6UrlRepresentation(byte[] addressBytes) {
// first, convert bytes to 16 bit chunks
byte[] addressBytes = address.getAddress();
int[] hextets = new int[8];
for (int i = 0; i < hextets.length; i++) {
hextets[i] = (addressBytes[2 * i] & 0xFF) << 8 | (addressBytes[2 * i + 1] & 0xFF);
Expand Down Expand Up @@ -309,6 +372,14 @@ public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator
return null;
}

/**
* Returns the wildcard address to listen on all interfaces.
* @return Either 0.0.0.0 or :: depending on the IP setup.
*/
public static String getWildcardIPAddress() {
return WILDCARD_ADDRESS;
}

public interface SocketFactory {
ServerSocket createSocket(int port) throws IOException;
}
Expand Down
59 changes: 59 additions & 0 deletions flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
Expand Down Expand Up @@ -162,4 +163,62 @@ public void testFreePortRangeUtility() {
error = null;

}

@Test
public void testFormatAddress() throws UnknownHostException {
{
// IPv4
String host = "1.2.3.4";
int port = 42;
Assert.assertEquals(host + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
}
{
// IPv6
String host = "2001:0db8:85a3:0000:0000:8a2e:0370:7334";
int port = 42;
Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
}
{
// Hostnames
String host = "somerandomhostname";
int port = 99;
Assert.assertEquals(host + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
}
{
// Whitespace
String host = " somerandomhostname ";
int port = 99;
Assert.assertEquals(host.trim() + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
}
{
// Illegal hostnames
String host = "illegalhost.";
int port = 42;
try {
NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
fail();
} catch (Exception ignored) {}
// Illegal hostnames
host = "illegalhost:fasf";
try {
NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
fail();
} catch (Exception ignored) {}
}
{
// Illegal port ranges
String host = "1.2.3.4";
int port = -1;
try {
NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
fail();
} catch (Exception ignored) {}
}
{
// lower case conversion of hostnames
String host = "CamelCaseHostName";
int port = 99;
Assert.assertEquals(host.toLowerCase() + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
}
}
}
Loading

0 comments on commit 27ebdf7

Please sign in to comment.