Skip to content

Commit

Permalink
[FLINK-2722] Use InetAddress.getLocalHost() as first approach when de…
Browse files Browse the repository at this point in the history
…tecting the TMs own ip/hostname

This closes apache#1159
  • Loading branch information
rmetzger committed Sep 22, 2015
1 parent c24b8e6 commit a3df109
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 84 deletions.
105 changes: 22 additions & 83 deletions flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class NetUtils {
* There is only a state transition if the current state failed to determine the address.
*/
private enum AddressDetectionState {
/** Connect from interface returned by InetAddress.getLocalHost() **/
LOCAL_HOST(50),
/** Detect own IP address based on the target IP address. Look for common prefix */
ADDRESS(50),
/** Try to connect on all Interfaces and all their addresses with a low timeout */
Expand All @@ -73,87 +75,6 @@ public int getTimeout() {
}
}

/**
* Find out the TaskManager's own IP address, simple version.
*/
public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) throws IOException {
AddressDetectionState strategy = jobManagerAddress != null ? AddressDetectionState.ADDRESS: AddressDetectionState.HEURISTIC;

while (true) {
Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();

while (e.hasMoreElements()) {
NetworkInterface n = e.nextElement();
Enumeration<InetAddress> ee = n.getInetAddresses();

while (ee.hasMoreElements()) {
InetAddress i = ee.nextElement();

switch (strategy) {
case ADDRESS:
if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
if (tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true)) {
LOG.info("Determined {} as the machine's own IP address", i);
return i;
}
}
break;

case FAST_CONNECT:
case SLOW_CONNECT:
boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true);
if (correct) {
LOG.info("Determined {} as the machine's own IP address", i);
return i;
}
break;

case HEURISTIC:
if (LOG.isDebugEnabled()) {
LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
" isLinkLocalAddress:" + i.isLinkLocalAddress() +
" isLoopbackAddress:" + i.isLoopbackAddress() + ".");
}

if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
"loopback address. Using instead " + i.getHostAddress() + " on network " +
"interface " + n.getName() + ".");
return i;
}
break;

default:
throw new RuntimeException("Unknown address detection strategy: " + strategy);
}
}
}
// state control
switch (strategy) {
case ADDRESS:
strategy = AddressDetectionState.FAST_CONNECT;
break;
case FAST_CONNECT:
strategy = AddressDetectionState.SLOW_CONNECT;
break;
case SLOW_CONNECT:
if (!InetAddress.getLocalHost().isLoopbackAddress()) {
LOG.info("Heuristically taking " + InetAddress.getLocalHost() + " as own " +
"IP address.");
return InetAddress.getLocalHost();
} else {
strategy = AddressDetectionState.HEURISTIC;
break;
}
case HEURISTIC:
throw new RuntimeException("Unable to resolve own inet address by connecting " +
"to address (" + jobManagerAddress + ").");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Defaulting to detection strategy " + strategy);
}
}
}

/**
* Finds the local network address from which this machine can connect to the target
Expand Down Expand Up @@ -191,7 +112,7 @@ public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,

// loop while there is time left
while (elapsedTime < maxWaitMillis) {
AddressDetectionState strategy = AddressDetectionState.ADDRESS;
AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;

boolean logging = elapsedTime >= startLoggingAfter;
if (logging) {
Expand All @@ -206,6 +127,9 @@ public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,

// pick the next strategy
switch (strategy) {
case LOCAL_HOST:
strategy = AddressDetectionState.ADDRESS;
break;
case ADDRESS:
strategy = AddressDetectionState.FAST_CONNECT;
break;
Expand Down Expand Up @@ -262,6 +186,18 @@ private static InetAddress findAddressUsingStrategy(AddressDetectionState strate
InetSocketAddress targetAddress,
boolean logging) throws IOException
{
// try LOCAL_HOST strategy independent of the network interfaces
if(strategy == AddressDetectionState.LOCAL_HOST) {
InetAddress localhostName = InetAddress.getLocalHost();

if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
return localhostName;
} else {
return null;
}
}

final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();

// for each network interface
Expand Down Expand Up @@ -464,7 +400,7 @@ public InetAddress findConnectingAddress(
}

if (targetAddress != null) {
AddressDetectionState strategy = AddressDetectionState.ADDRESS;
AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;

boolean logging = elapsedTime >= startLoggingAfter.toMillis();
if (logging) {
Expand All @@ -479,6 +415,9 @@ public InetAddress findConnectingAddress(

// pick the next strategy
switch (strategy) {
case LOCAL_HOST:
strategy = AddressDetectionState.ADDRESS;
break;
case ADDRESS:
strategy = AddressDetectionState.FAST_CONNECT;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ public void connectToCluster() throws IOException {

// start actor system
LOG.info("Start actor system.");
InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM
// find name of own public interface, able to connect to the JM
// try to find address for 2 seconds. log after 400 ms.
InetAddress ownHostname = NetUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
actorSystem = AkkaUtils.createActorSystem(flinkConfig,
new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0)));

Expand Down

0 comments on commit a3df109

Please sign in to comment.