From 5a29a2e9bf5c0d7223805e3a54a652c44d9b5bc3 Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Wed, 23 Feb 2011 16:33:27 +0100 Subject: [PATCH 1/8] Modifications to DiscoveryService --- .../nephele/discovery/DiscoveryService.java | 592 ++++++++++-------- .../nephele/jobmanager/JobManager.java | 21 +- .../nephele/taskmanager/TaskManager.java | 24 +- .../discovery/DiscoveryServiceTest.java | 47 +- 4 files changed, 381 insertions(+), 303 deletions(-) diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java index b1782affbdd1f..8d271bbc657d2 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; -import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -30,8 +29,6 @@ import java.util.Enumeration; import java.util.HashSet; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,11 +46,6 @@ * The discovery service uses the discoveryservice.magicnumber configuration parameter. It needs to be set * to any number. Task managers discover the job manager only if their magic number matches. This allows running two * Nephele setups on the same cluster without interference of the {@link DiscoveryService}s. - *

- * On hosts with several network interfaces or IP addresses, the servicenetwork can be used to describe to - * which IP the services shall be bound. A node with IP addresses 130.149.3.99/255.255.255.192 and - * 192.168.198.3/255.255.0.0 could specify for example 130.149.3.64 or 192.168.0.0 as the service network. In fact also - * 130.149.3.99 and 192.168.198.3 would work. * * @author warneke * @author Dominic Battre @@ -85,6 +77,11 @@ public class DiscoveryService implements Runnable { */ private static final String MAGICNUMBER_KEY = "discoveryservice.magicnumber"; + /** + * The default magic number. + */ + private static final int DEFAULT_MAGICNUMBER_VALUE = 0; + /** * The log object used for debugging. */ @@ -95,6 +92,11 @@ public class DiscoveryService implements Runnable { */ private static DiscoveryService discoveryService = null; + /** + * The network address the IPC is bound to, possibly null. + */ + private final InetAddress ipcAddress; + /** * The network port that is announced for the job manager's IPC service. */ @@ -105,41 +107,73 @@ public class DiscoveryService implements Runnable { */ private Thread listeningThread = null; - private final static Pattern pingPattern = Pattern.compile("PING (\\d+)"); - - private final static Pattern pongPattern = Pattern.compile("PONG (\\d+)"); - /** * The datagram socket of the discovery server. */ private DatagramSocket serverSocket = null; - private static final boolean USEIPV6 = "true".equals(System.getProperty("java.net.preferIPv4Stack")) ? false : true; + /** + * Flag indicating whether to use IPv6 or not. + */ + private static final boolean USE_IPV6 = "true".equals(System.getProperty("java.net.preferIPv4Stack")) ? false + : true; + + /** + * ID for job manager lookup request packets. + */ + private static final int JM_LOOKUP_REQUEST_ID = 0; + + /** + * ID for job manager lookup reply packets. + */ + private static final int JM_LOOKUP_REPLY_ID = 1; + + /** + * ID for task manager address request packets. + */ + private static final int TM_ADDRESS_REQUEST_ID = 2; + + /** + * ID for task manager address reply packets. + */ + private static final int TM_ADDRESS_REPLY_ID = 3; + + /** + * The default size of response datagram packets. + */ + private static final int RESPONSE_PACKET_SIZE = 64; /** * Constructs a new {@link DiscoveryService} object and stores * the job manager's IPC port. * + * @param ipcAddress + * the network address the IPC is bound to, possibly null * @param ipcPort + * the network port that is announced for the job manager's IPC service */ - private DiscoveryService(int ipcPort) { + private DiscoveryService(final InetAddress ipcAddress, final int ipcPort) { + + this.ipcAddress = ipcAddress; this.ipcPort = ipcPort; } /** * Starts a new discovery service. * + * @param ipcAddress + * the network address the IPC is bound to, possibly null * @param ipcPort - * the network port that is announced for - * the job manager's IPC service. + * the network port that is announced for the job manager's IPC service. * @throws DiscoveryException * thrown if the discovery service could not be started because * of network difficulties */ - public static synchronized void startDiscoveryService(int ipcPort) throws DiscoveryException { + public static synchronized void startDiscoveryService(final InetAddress ipcAddress, final int ipcPort) + throws DiscoveryException { if (discoveryService == null) { - discoveryService = new DiscoveryService(ipcPort); + discoveryService = new DiscoveryService(ipcAddress, ipcPort); discoveryService.startService(); } } @@ -165,7 +199,7 @@ public static synchronized void stopDiscoveryService() { private void startService() throws DiscoveryException { try { - this.serverSocket = new DatagramSocket(DISCOVERYPORT, getServiceAddress()); + this.serverSocket = new DatagramSocket(DISCOVERYPORT, this.ipcAddress); } catch (SocketException e) { throw new DiscoveryException(e.toString()); } @@ -190,47 +224,140 @@ private void stopService() { } /** - * Creates a new PING Message. - *

- * The message follows the format "PING X", where X is a magic number configured in "discoveryservice.magicnumber" - * or 0 if no number is specified in the configuration. The magic number allows to execute several Nephele instances - * in the same network without IP traffic isolation. Without such a mechanism, one Task Manager might register at - * the wrong Discovery Service. + * Creates a new job manager lookup request packet. * - * @return new PING datagram. + * @return a new job manager lookup request packet */ - private static DatagramPacket createPingPacket() { - int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, 0); - byte[] bytes = ("PING " + magicNumber).getBytes(); + private static DatagramPacket createJobManagerLookupRequestPacket() { + + final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE); + final byte[] bytes = new byte[8]; + integerToByteArray(magicNumber, 0, bytes); + integerToByteArray(JM_LOOKUP_REQUEST_ID, 4, bytes); + return new DatagramPacket(bytes, bytes.length); } /** - * Returns whether the {@link DatagramPacket} contains a PING Message - * that is addressed to us. + * Creates a new job manager lookup reply packet. * - * @see {@link #createPingPacket()} for an explanation of the message format - * @param packet - * Received {@link DatagramPacket} that might contain a PING message - * @return true if the {@link DatagramPacket} contains a PING message addressed to us. + * @return a new job manager lookup reply packet + */ + private static DatagramPacket createJobManagerLookupReplyPacket(final int ipcPort) { + + final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE); + final byte[] bytes = new byte[12]; + integerToByteArray(magicNumber, 0, bytes); + integerToByteArray(JM_LOOKUP_REPLY_ID, 4, bytes); + integerToByteArray(ipcPort, 8, bytes); + + return new DatagramPacket(bytes, bytes.length); + } + + /** + * Creates a new task manager address request packet. + * + * @return a new task manager address request packet + */ + private static DatagramPacket createTaskManagerAddressRequestPacket() { + + final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE); + final byte[] bytes = new byte[8]; + integerToByteArray(magicNumber, 0, bytes); + integerToByteArray(TM_ADDRESS_REQUEST_ID, 4, bytes); + + return new DatagramPacket(bytes, bytes.length); + } + + /** + * Creates a new task manager address reply packet. + * + * @param taskManagerAddress + * the address of the task manager which sent the request + * @return a new task manager address reply packet */ - private static boolean isPingForUs(DatagramPacket packet) { + private static DatagramPacket createTaskManagerAddressReplyPacket(final InetAddress taskManagerAddress) { + + final byte[] addr = taskManagerAddress.getAddress(); + final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE); + final byte[] bytes = new byte[12 + addr.length]; + integerToByteArray(magicNumber, 0, bytes); + integerToByteArray(TM_ADDRESS_REPLY_ID, 4, bytes); + integerToByteArray(addr.length, 8, bytes); + System.arraycopy(addr, 0, bytes, 12, addr.length); + + return new DatagramPacket(bytes, bytes.length); + } + + /** + * Returns the network address with which the task manager shall announce itself to the job manager. To determine + * the address this method exchanges packets with the job manager. + * + * @param jobManagerAddress + * the address of the job manager + * @return the address with which the task manager shall announce itself to the job manager + * @throws DiscoveryException + * thrown if an error occurs during the packet exchange + */ + public static InetAddress getTaskManagerAddress(final InetAddress jobManagerAddress) throws DiscoveryException { + + InetAddress taskManagerAddress = null; + + DatagramSocket socket = null; try { - String content = new String(packet.getData(), packet.getOffset(), packet.getLength()); - Matcher m = pingPattern.matcher(content); + socket = new DatagramSocket(); + LOG.debug("Setting socket timeout to " + CLIENTSOCKETTIMEOUT); + socket.setSoTimeout(CLIENTSOCKETTIMEOUT); + + final DatagramPacket responsePacket = new DatagramPacket(new byte[RESPONSE_PACKET_SIZE], + RESPONSE_PACKET_SIZE); + + for (int retries = 0; retries < DISCOVERFAILURERETRIES; retries++) { + + final DatagramPacket addressRequest = createTaskManagerAddressRequestPacket(); + addressRequest.setAddress(jobManagerAddress); + addressRequest.setPort(DISCOVERYPORT); - if (m.matches()) { - final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, 0); - if (Integer.parseInt(m.group(1)) == magicNumber) { - return true; + LOG.debug("Sending Task Manager address request to " + addressRequest.getSocketAddress()); + socket.send(addressRequest); + + try { + socket.receive(responsePacket); + } catch (SocketTimeoutException ste) { + LOG.warn("Timeout wainting for address reply. Retrying..."); + continue; + } + + if (!isPacketForUs(responsePacket)) { + LOG.warn("Received packet which is not destined to this Nephele setup"); + continue; } + + final int packetTypeID = getPacketTypeID(responsePacket); + if (packetTypeID != TM_ADDRESS_REPLY_ID) { + LOG.warn("Received response of unknown type " + packetTypeID + ", discarding..."); + continue; + } + + taskManagerAddress = extractInetAddress(responsePacket); + break; } - } catch (Exception e) { - LOG.error("Error parsing ping", e); + + } catch (IOException ioe) { + throw new DiscoveryException(StringUtils.stringifyException(ioe)); + } finally { + if (socket != null) { + socket.close(); + } + } + + if (taskManagerAddress == null) { + throw new DiscoveryException("Unable to obtain task manager address"); } - return false; + + return taskManagerAddress; } /** @@ -244,16 +371,6 @@ private static boolean isPingForUs(DatagramPacket packet) { */ public static InetSocketAddress getJobManagerAddress() throws DiscoveryException { - /* - * try { - * InetSocketAddress result = new InetSocketAddress(InetAddress.getByName("192.168.2.111"),6123); - * return result; - * } catch (UnknownHostException e) { - * // TODO Auto-generated catch block - * e.printStackTrace(); - * throw new DiscoveryException("Unable toooo discoer JobManager via IP broadcast!"); - * } - */ InetSocketAddress jobManagerAddress = null; DatagramSocket socket = null; @@ -271,26 +388,38 @@ public static InetSocketAddress getJobManagerAddress() throws DiscoveryException LOG.debug("Setting socket timeout to " + CLIENTSOCKETTIMEOUT); socket.setSoTimeout(CLIENTSOCKETTIMEOUT); - final DatagramPacket pongBuffer = new DatagramPacket(new byte[100], 100); + final DatagramPacket responsePacket = new DatagramPacket(new byte[RESPONSE_PACKET_SIZE], + RESPONSE_PACKET_SIZE); for (int retries = 0; retries < DISCOVERFAILURERETRIES; retries++) { for (InetAddress broadcast : targetAddresses) { - final DatagramPacket ping = createPingPacket(); - ping.setAddress(broadcast); - ping.setPort(DISCOVERYPORT); - LOG.debug("Sending discovery request to " + ping.getSocketAddress()); - socket.send(ping); + final DatagramPacket lookupRequest = createJobManagerLookupRequestPacket(); + lookupRequest.setAddress(broadcast); + lookupRequest.setPort(DISCOVERYPORT); + LOG.debug("Sending discovery request to " + lookupRequest.getSocketAddress()); + socket.send(lookupRequest); } try { - socket.receive(pongBuffer); + socket.receive(responsePacket); } catch (SocketTimeoutException ste) { LOG.debug("Timeout wainting for discovery reply. Retrying..."); continue; } - final int ipcPort = extractIpcPort(pongBuffer); + if (!isPacketForUs(responsePacket)) { + LOG.debug("Received packet which is not destined to this Nephele setup"); + continue; + } + + final int packetTypeID = getPacketTypeID(responsePacket); + if (packetTypeID != JM_LOOKUP_REPLY_ID) { + LOG.error("Received unexpected packet type " + packetTypeID + ", discarding... "); + continue; + } + + final int ipcPort = extractIpcPort(responsePacket); // TODO: This condition helps to deal with legacy implementations of the DiscoveryService if (ipcPort < 0) { continue; @@ -298,20 +427,21 @@ public static InetSocketAddress getJobManagerAddress() throws DiscoveryException // Replace port from discovery service with the actual RPC port // of the job manager - if (USEIPV6) { + if (USE_IPV6) { // TODO: No connection possible unless we remove the scope identifier - if (pongBuffer.getAddress() instanceof Inet6Address) { + if (responsePacket.getAddress() instanceof Inet6Address) { try { - jobManagerAddress = new InetSocketAddress(InetAddress.getByAddress(pongBuffer.getAddress() + jobManagerAddress = new InetSocketAddress(InetAddress.getByAddress(responsePacket + .getAddress() .getAddress()), ipcPort); } catch (UnknownHostException e) { throw new DiscoveryException(StringUtils.stringifyException(e)); } } else { - throw new DiscoveryException(pongBuffer.getAddress() + " is not a valid IPv6 address"); + throw new DiscoveryException(responsePacket.getAddress() + " is not a valid IPv6 address"); } } else { - jobManagerAddress = new InetSocketAddress(pongBuffer.getAddress(), ipcPort); + jobManagerAddress = new InetSocketAddress(responsePacket.getAddress(), ipcPort); } LOG.debug("Discovered job manager at " + jobManagerAddress); break; @@ -333,29 +463,63 @@ public static InetSocketAddress getJobManagerAddress() throws DiscoveryException return jobManagerAddress; } + /** + * Extracts an IPC port from the given datagram packet. The datagram packet must be of the type + * JM_LOOKUP_REPLY_PACKET_ID. + * + * @param packet + * the packet to extract the IPC port from. + * @return the extracted IPC port or -1 if the port could not be extracted + */ private static int extractIpcPort(DatagramPacket packet) { - final String content = new String(packet.getData(), packet.getOffset(), packet.getLength()); + final byte[] data = packet.getData(); - Matcher m = pongPattern.matcher(content); + if (data == null) { + return -1; + } - if (!m.matches()) { - LOG.error("DiscoveryService cannot extract port from " + content); + if (packet.getLength() < 12) { return -1; } - LOG.debug("Received response from DiscoveryService: " + content); + return byteArrayToInteger(data, 8); + } + + /** + * Extracts an {@link InetAddress} object from the given datagram packet. The datagram packet must be of the type + * TM_ADDRESS_REPLY_PACKET_ID. + * + * @param packet + * the packet to extract the address from + * @return the extracted address or null if it could not be extracted + */ + private static InetAddress extractInetAddress(DatagramPacket packet) { + + final byte[] data = packet.getData(); + + if (data == null) { + return null; + } + + if (packet.getLength() < 16) { + return null; + } + + final int len = byteArrayToInteger(data, 8); + + final byte[] addr = new byte[len]; + System.arraycopy(data, 12, addr, 0, len); - int ipcPort = 0; + InetAddress inetAddress = null; try { - ipcPort = Integer.parseInt(m.group(1)); - } catch (NumberFormatException e) { - LOG.error(StringUtils.stringifyException(e)); - return -1; + inetAddress = InetAddress.getByAddress(addr); + } catch (UnknownHostException e) { + return null; } - return ipcPort; + return inetAddress; } /** @@ -378,8 +542,6 @@ private static Set getBroadcastAddresses() { return broadcastAddresses; } - final InetAddress serviceAddress = getServiceAddress(); - while (ie.hasMoreElements()) { NetworkInterface nic = ie.nextElement(); try { @@ -391,7 +553,7 @@ private static Set getBroadcastAddresses() { for (InterfaceAddress adr : nic.getInterfaceAddresses()) { // collect all broadcast addresses - if (USEIPV6) { + if (USE_IPV6) { try { final InetAddress interfaceAddress = adr.getAddress(); if (interfaceAddress instanceof Inet6Address) { @@ -406,9 +568,7 @@ private static Set getBroadcastAddresses() { } } else { final InetAddress broadcast = adr.getBroadcast(); - if ((broadcast != null && serviceAddress == null) - || (broadcast != null && DiscoveryService.onSameNetwork(serviceAddress, broadcast, adr - .getNetworkPrefixLength()))) { + if (broadcast != null) { broadcastAddresses.add(broadcast); } } @@ -423,224 +583,146 @@ private static Set getBroadcastAddresses() { } /** - * Calculates the bit vector of the network prefix for a specific addressLength and - * networkPrefixLength - * - * @param addressLength - * the length (in bits) of the network address - * @param networkPrefixLength - * the length (in bits) of network address prefix - * @return bit vector representing the prefix of the network address + * Server side implementation of Discovery Service. */ - static byte[] getNetworkPrefix(int addressLength, int networkPrefixLength) { - - if (networkPrefixLength <= 0 || networkPrefixLength >= addressLength) { - throw new IllegalArgumentException("Invalid networkPrefixLength"); - } - - byte[] netmask = new byte[addressLength / 8]; - - for (int byteNr = 0; byteNr < netmask.length; ++byteNr) { - // create netmask for the current byte + @Override + public void run() { - // how many '1's remain for this byte? - // e.g. if networkPrefixLength was 11, - // then we have 8x'1' in the first byte - // and 3x'1' followed by 5x'0' in the second byte - int onesInThisByte = Math.min(networkPrefixLength, 8); - networkPrefixLength -= onesInThisByte; + final DatagramPacket requestPacket = new DatagramPacket(new byte[64], 64); - // calculate bit pattern for current byte. + while (!Thread.interrupted()) { - // suppose onesInThisByte is 3 - // (1<<5) = 0010 0000 - // (1<<5)-1 = 0001 1111 - // ~(1<<5)-1) = 1110 0000 + try { + this.serverSocket.receive(requestPacket); - netmask[byteNr] = (byte) ~((1 << (8 - onesInThisByte)) - 1); - } + if (!isPacketForUs(requestPacket)) { + LOG.debug("Received request packet which is not destined to this Nephele setup"); + continue; + } - return netmask; - } + final int packetTypeID = getPacketTypeID(requestPacket); + if (packetTypeID == JM_LOOKUP_REQUEST_ID) { - /** - * Returns for a given networkPrefixLength whether a and b are in the same - * network. - * - * @param a - * first IP Address - * @param b - * second IP Address - * @param networkPrefixLength - * number of bits in IP addresses belonging to network id - * @return true if a and b belong to the same network. - */ - static boolean onSameNetwork(InetAddress a, InetAddress b, int networkPrefixLength) { + LOG.debug("Received job manager lookup request from " + requestPacket.getSocketAddress()); + final DatagramPacket responsePacket = createJobManagerLookupReplyPacket(this.ipcPort); + responsePacket.setAddress(requestPacket.getAddress()); + responsePacket.setPort(requestPacket.getPort()); - if ((a == null) || (b == null)) { - return false; - } + this.serverSocket.send(responsePacket); - // convert both addresses to byte array - byte[] A = a.getAddress(); - byte[] B = b.getAddress(); + } else if (packetTypeID == TM_ADDRESS_REQUEST_ID) { + LOG.debug("Received task manager address request from " + requestPacket.getSocketAddress()); + final DatagramPacket responsePacket = createTaskManagerAddressReplyPacket(requestPacket + .getAddress()); + responsePacket.setAddress(requestPacket.getAddress()); + responsePacket.setPort(requestPacket.getPort()); - // Compatible addresses must have the same length - if (A.length != B.length) { - return false; - } + this.serverSocket.send(responsePacket); - byte[] prefix = getNetworkPrefix(A.length * 8, networkPrefixLength); + } else { + LOG.error("Received packet of unknown type " + packetTypeID + ", discarding..."); + } - // check byte wise whether (A & netmask) = (B & netmask). - for (int byteNr = 0; byteNr < A.length; ++byteNr) { - if ((A[byteNr] & prefix[byteNr]) != (B[byteNr] & prefix[byteNr])) { - return false; + } catch (SocketTimeoutException ste) { + LOG.debug("Discovery service: socket timeout"); + } catch (IOException ioe) { + LOG.error("Discovery service stopped working with IOException:\n" + ioe.toString()); + break; } } - return true; + + // Close the socket finally + this.serverSocket.close(); } /** - * This function returns the IP address to which services shall bind. - *

- * If the configuration file contains an entry like servicenetwork=192.168.178.0 (or a respective IPv6 - * address), this function returns the first IP bound to a network interface that belongs to the same network as the - * specified servicenetwork. - *

- * If the configuration file does not contain a servicenetwork entry, the function returns - * null. As a result, services will be bound to any/all local addresses. If no valid IP address can be - * found, a {@link SocketException} is thrown. + * Serializes and writes the given integer number to the provided byte array. * - * @return {@link InetAddress} to which services shall bind. + * @param integerToSerialize + * the integer number of serialize + * @param offset + * the offset at which to start writing inside the byte array + * @param byteArray + * the byte array to write to */ - public static InetAddress getServiceAddress() { + private static void integerToByteArray(final int integerToSerialize, final int offset, final byte[] byteArray) { - final String serviceNetwork = GlobalConfiguration.getString("servicenetwork", null); - - InetAddress serviceNetworkAddress = null; - - if (serviceNetwork == null) { - return null; + for (int i = 0; i < 4; ++i) { + final int shift = i << 3; // i * 8 + byteArray[(offset + 3) - i] = (byte) ((integerToSerialize & (0xff << shift)) >>> shift); } + } - try { - serviceNetworkAddress = InetAddress.getByName(serviceNetwork); - - if ((serviceNetworkAddress instanceof Inet4Address) && USEIPV6) { - throw new UnknownHostException(); - } + /** + * Reads and deserializes an integer number from the given byte array. + * + * @param byteArray + * the byte array to read from + * @param offset + * the offset at which to start reading the byte array + * @return the deserialized integer number + */ + private static int byteArrayToInteger(final byte[] byteArray, final int offset) { - if ((serviceNetworkAddress instanceof Inet6Address) && !USEIPV6) { - throw new UnknownHostException(); - } + int integer = 0; - } catch (UnknownHostException e) { - if (USEIPV6) { - LOG.error("Configured service network is not a valid IPv6 address"); - } else { - LOG.error("Configured service network is not a valid IPv4 address"); - } + for (int i = 0; i < 4; ++i) { + System.out.print(byteArray[i]); } - // If the service network address could not be parsed, we fall back to all interfaces - if (serviceNetworkAddress == null) { - return null; + for (int i = 0; i < 4; ++i) { + integer |= (byteArray[(offset + 3) - i] & 0xff) << (i << 3); } - return findLocalAddressOnSameNetwork(serviceNetworkAddress); + return integer; } /** - * Server side implementation of Discovery Service. + * Extracts the datagram packet's magic number and checks it matches with the local magic number. + * + * @param packet + * the packet to check + * @return true if the packet carries the magic number expected by the local service, otherwise + * false */ - @Override - public void run() { + private static boolean isPacketForUs(final DatagramPacket packet) { - final DatagramPacket ping = new DatagramPacket(new byte[100], 100); - final byte[] PONG = ("PONG " + Integer.toString(this.ipcPort)).getBytes(); - final DatagramPacket pong = new DatagramPacket(PONG, PONG.length); + final byte[] data = packet.getData(); - while (!Thread.interrupted()) { - - try { - this.serverSocket.receive(ping); + if (data == null) { + return false; + } - if (isPingForUs(ping)) { - LOG.debug("Received ping from " + ping.getSocketAddress()); - pong.setAddress(ping.getAddress()); - pong.setPort(ping.getPort()); + if (packet.getLength() < 4) { + return false; + } - this.serverSocket.send(pong); - } else { - LOG.debug("Received ping for somebody else from " + ping.getSocketAddress()); - } - } catch (SocketTimeoutException ste) { - LOG.debug("Discovery service: socket timeout"); - } catch (IOException ioe) { - LOG.error("Discovery service stopped working with IOException:\n" + ioe.toString()); - break; - } + if (byteArrayToInteger(data, 0) != GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE)) { + return false; } - // Close the socket finally - this.serverSocket.close(); + return true; } /** - * Finds a local network address that is on the same network as candidateAddress or null - * if no such - * address is attached to one of the host's interfaces. + * Extracts the packet type ID from the given datagram packet. * - * @param candidateAddress - * the candidate address - * @return a local network address on the same network as candidateAddress or null if no - * such address can be found + * @param packet + * the packet to extract the type ID from + * @return the extracted packet type ID or -1 if the ID could not be extracted */ - public static InetAddress findLocalAddressOnSameNetwork(InetAddress candidateAddress) { + private static int getPacketTypeID(final DatagramPacket packet) { - if (candidateAddress == null) { - LOG.debug("candidateAddress is null"); - return null; - } + final byte[] data = packet.getData(); - // If candidate address is a loopback address, simply return the address - if (candidateAddress.isLoopbackAddress()) { - return candidateAddress; + if (data == null) { + return -1; } - // iterate over all network interfaces and return first address that - // is in the same network as candidateAddress - try { - Enumeration ie = NetworkInterface.getNetworkInterfaces(); - - while (ie.hasMoreElements()) { - NetworkInterface i = ie.nextElement(); - if (!i.isUp()) { - continue; - } - - for (InterfaceAddress adr : i.getInterfaceAddresses()) { - InetAddress address = adr.getAddress(); - if ((address instanceof Inet4Address) && USEIPV6) { - continue; - } - - if ((address instanceof Inet6Address) && !USEIPV6) { - continue; - } - - final int networkPrefixLength = adr.getNetworkPrefixLength(); - - if (i.isPointToPoint() == false && onSameNetwork(address, candidateAddress, networkPrefixLength)) { - return address; - } - } - } - } catch (SocketException e) { - LOG.error(e); + if (packet.getLength() < 8) { + return -1; } - return null; + return byteArrayToInteger(data, 4); } } diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java index 157100dfcf4f3..3741b2655c6f5 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java @@ -34,7 +34,9 @@ package eu.stratosphere.nephele.jobmanager; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -153,13 +155,26 @@ public JobManager(String configDir, String executionMode) { // First, try to load global configuration GlobalConfiguration.loadConfiguration(configDir); + final String ipcAddressString = GlobalConfiguration + .getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + + InetAddress ipcAddress = null; + if (ipcAddressString != null) { + try { + ipcAddress = InetAddress.getByName(ipcAddressString); + } catch (UnknownHostException e) { + LOG.error("Cannot convert " + ipcAddressString + " to an IP address: " + + StringUtils.stringifyException(e)); + System.exit(FAILURERETURNCODE); + } + } + final int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); - ; // First of all, start discovery manager try { - DiscoveryService.startDiscoveryService(ipcPort); + DiscoveryService.startDiscoveryService(ipcAddress, ipcPort); } catch (DiscoveryException e) { LOG.error("Cannot start discovery manager: " + StringUtils.stringifyException(e)); System.exit(FAILURERETURNCODE); @@ -169,7 +184,7 @@ public JobManager(String configDir, String executionMode) { this.recommendedClientPollingInterval = GlobalConfiguration.getInteger("jobclient.polling.internval", 5); // Determine own RPC address - final InetSocketAddress rpcServerAddress = new InetSocketAddress(DiscoveryService.getServiceAddress(), ipcPort); + final InetSocketAddress rpcServerAddress = new InetSocketAddress(ipcAddress, ipcPort); // Start job manager's IPC server try { diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index 76604b749a161..dd7d446ddb021 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -187,12 +187,15 @@ public TaskManager(String configDir) throws Exception { final int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); - final InetAddress taskManagerAnnounceAddress = DiscoveryService.findLocalAddressOnSameNetwork(jobManagerAddress - .getAddress()); - final InetSocketAddress taskManagerBindAddress = new InetSocketAddress(DiscoveryService.getServiceAddress(), - ipcPort); + InetAddress taskManagerAddress = null; - this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAnnounceAddress, ipcPort, dataPort); + try { + taskManagerAddress = DiscoveryService.getTaskManagerAddress(jobManagerAddress.getAddress()); + } catch (DiscoveryException e) { + throw new Exception("Failed to initialize discovery service. " + e.getMessage(), e); + } + + this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAddress, ipcPort, dataPort); LOG.info("Announcing connection information " + this.localInstanceConnectionInfo + " to job manager"); @@ -221,8 +224,7 @@ public TaskManager(String configDir) throws Exception { // Start local RPC server Server taskManagerServer = null; try { - taskManagerServer = RPC.getServer(this, taskManagerBindAddress.getHostName(), taskManagerBindAddress - .getPort(), handlerCount, false); + taskManagerServer = RPC.getServer(this, taskManagerAddress.getHostName(), ipcPort, handlerCount, false); taskManagerServer.start(); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); @@ -676,7 +678,7 @@ private ChannelType hasCommonOutputChannelType(Environment environment) { * the {@link Environment} of the task to be unregistered */ private void unregisterTask(ExecutionVertexID id, Environment environment) { - + // Unregister channels for (int i = 0; i < environment.getNumberOfOutputGates(); i++) { unregisterOutputChannels(environment.getOutputGate(i)); @@ -700,9 +702,9 @@ private void unregisterTask(ExecutionVertexID id, Environment environment) { if (this.memoryManager != null) { this.memoryManager.releaseAll(environment.getInvokable()); } - - //TODO: Unregister from IO manager here - + + // TODO: Unregister from IO manager here + // Check if there are still vertices running that belong to the same job int numberOfVerticesBelongingToThisJob = 0; synchronized (this.runningTasks) { diff --git a/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/discovery/DiscoveryServiceTest.java b/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/discovery/DiscoveryServiceTest.java index 459fc9e4e1945..e4d8af5795106 100644 --- a/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/discovery/DiscoveryServiceTest.java +++ b/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/discovery/DiscoveryServiceTest.java @@ -15,42 +15,21 @@ package eu.stratosphere.nephele.discovery; -import org.junit.Test; - +/** + * This class contains tests for the {@link DiscoveryService} class. + * + * @author warneke + * + */ public class DiscoveryServiceTest { - @Test - public void testToNetmask() { - /* - * assertArrayEquals( - * new byte[] {0, 0, 0, 0}, - * DiscoveryService.toNetmask(0) ); - * assertArrayEquals( - * new byte[] {(byte)255, 0, 0, 0}, - * DiscoveryService.toNetmask(8) ); - * assertArrayEquals( - * new byte[] {(byte)255, (byte)255, 0, 0}, - * DiscoveryService.toNetmask(16) ); - * assertArrayEquals( - * new byte[] {(byte)255, (byte)255, (byte)255, 0}, - * DiscoveryService.toNetmask(24) ); - * assertArrayEquals( - * new byte[] {(byte)255, (byte)255, (byte)255, (byte)255}, - * DiscoveryService.toNetmask(32) ); - * assertArrayEquals( - * new byte[] {(byte)255, (byte)224, 0, 0}, - * DiscoveryService.toNetmask(8+3) ); - */ - } - - @Test - public void testOnSameNetwork() throws Exception { - /* - * Inet4Address a = (Inet4Address)InetAddress.getByAddress(new byte[] {(byte)192, (byte)168, (byte)0, (byte)1}); - * Inet4Address b = (Inet4Address)InetAddress.getByAddress(new byte[] {(byte)192, (byte)168, (byte)1, (byte)1}); - * assertTrue( DiscoveryService.onSameNetwork(a, b, 16)); - * assertFalse( DiscoveryService.onSameNetwork(a, b, 24)); - */ +/* @BeforeClass + public static void startService() { + } + @AfterClass + public static void startService() { + + }*/ } From 34070726f6493cb423af3c3697a8d848c753a596 Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Fri, 25 Feb 2011 20:26:03 +0100 Subject: [PATCH 2/8] DiscoveryService now works if JobManager and TaskManager are in different subnets --- .../nephele/client/JobClient.java | 9 +- .../configuration/ConfigConstants.java | 10 - .../impl/JobManagerProfilerImpl.java | 8 +- .../nephele/discovery/DiscoveryService.java | 292 ++++++++++++------ .../nephele/jobmanager/JobManager.java | 2 +- .../nephele/profiling/ProfilingUtils.java | 17 +- .../nephele/taskmanager/TaskManager.java | 24 +- .../discovery/DiscoveryServiceTest.java | 93 +++++- .../local/LocalInstanceManagerTest.java | 47 ++- .../nephele/jobmanager/JobManagerITCase.java | 14 +- .../pact/client/web/WebInterfaceServer.java | 3 +- .../stratosphere-bin/conf/nephele-user.xml | 23 -- 12 files changed, 386 insertions(+), 156 deletions(-) diff --git a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/client/JobClient.java b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/client/JobClient.java index 0ee7a0fa3ddd4..fa6279a9df7b8 100644 --- a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/client/JobClient.java +++ b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/client/JobClient.java @@ -133,8 +133,7 @@ public void run() { * @throws IOException * thrown on error while initializing the RPC connection to the job manager */ - public JobClient(JobGraph jobGraph) - throws IOException { + public JobClient(JobGraph jobGraph) throws IOException { this(jobGraph, new Configuration()); } @@ -150,11 +149,9 @@ public JobClient(JobGraph jobGraph) * @throws IOException * thrown on error while initializing the RPC connection to the job manager */ - public JobClient(JobGraph jobGraph, Configuration configuration) - throws IOException { + public JobClient(JobGraph jobGraph, Configuration configuration) throws IOException { - final String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_ADDRESS); + final String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); final int port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); diff --git a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/configuration/ConfigConstants.java b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/configuration/ConfigConstants.java index eb5580a70535c..669d7ab480377 100644 --- a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/configuration/ConfigConstants.java +++ b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/configuration/ConfigConstants.java @@ -51,11 +51,6 @@ public final class ConfigConstants { */ public static final String TASK_MANAGER_DATA_PORT_KEY = "taskmanager.data.port"; - /** - * The key for the config parameter defining whether to use discovery on startup. - */ - public static final String TASK_MANAGER_USE_DISCOVERY_KEY = "taskmanager.setup.usediscovery"; - /** * The key for the config parameter defining the directory for temporary files. */ @@ -76,11 +71,6 @@ public final class ConfigConstants { // Default Values // ------------------------------------------------------------------------ - /** - * The default network address to connect to for communication with the job manager. - */ - public static final String DEFAULT_JOB_MANAGER_IPC_ADDRESS = "127.0.0.1"; - /** * The default network port to connect to for communication with the job manager. */ diff --git a/nephele/nephele-profiling/src/main/java/eu/stratosphere/nephele/profiling/impl/JobManagerProfilerImpl.java b/nephele/nephele-profiling/src/main/java/eu/stratosphere/nephele/profiling/impl/JobManagerProfilerImpl.java index abcdc342599db..e973acbf5a41c 100644 --- a/nephele/nephele-profiling/src/main/java/eu/stratosphere/nephele/profiling/impl/JobManagerProfilerImpl.java +++ b/nephele/nephele-profiling/src/main/java/eu/stratosphere/nephele/profiling/impl/JobManagerProfilerImpl.java @@ -16,6 +16,7 @@ package eu.stratosphere.nephele.profiling.impl; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; @@ -27,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import eu.stratosphere.nephele.configuration.GlobalConfiguration; -import eu.stratosphere.nephele.discovery.DiscoveryService; import eu.stratosphere.nephele.executiongraph.ExecutionGraph; import eu.stratosphere.nephele.ipc.RPC; import eu.stratosphere.nephele.ipc.Server; @@ -63,14 +63,14 @@ public class JobManagerProfilerImpl implements JobManagerProfiler, ProfilerImplP private final Map registeredJobs = new HashMap(); - public JobManagerProfilerImpl() - throws ProfilingException { + public JobManagerProfilerImpl(InetAddress jobManagerbindAddress) throws ProfilingException { // Start profiling IPC server final int handlerCount = GlobalConfiguration.getInteger(RPC_NUM_HANDLER_KEY, DEFAULT_NUM_HANLDER); final int rpcPort = GlobalConfiguration.getInteger(ProfilingUtils.JOBMANAGER_RPC_PORT_KEY, ProfilingUtils.JOBMANAGER_DEFAULT_RPC_PORT); - final InetSocketAddress rpcServerAddress = new InetSocketAddress(DiscoveryService.getServiceAddress(), rpcPort); + + final InetSocketAddress rpcServerAddress = new InetSocketAddress(jobManagerbindAddress, rpcPort); Server profilingServerTmp = null; try { diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java index 8d271bbc657d2..b69dba0bc33d4 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java @@ -27,7 +27,10 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Enumeration; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -83,65 +86,90 @@ public class DiscoveryService implements Runnable { private static final int DEFAULT_MAGICNUMBER_VALUE = 0; /** - * The log object used for debugging. + * Flag indicating whether to use IPv6 or not. */ - private static final Log LOG = LogFactory.getLog(DiscoveryService.class); + private static final boolean USE_IPV6 = "true".equals(System.getProperty("java.net.preferIPv4Stack")) ? false + : true; /** - * Singleton instance of the discovery service. + * ID for job manager lookup request packets. */ - private static DiscoveryService discoveryService = null; + private static final int JM_LOOKUP_REQUEST_ID = 0; /** - * The network address the IPC is bound to, possibly null. + * ID for job manager lookup reply packets. */ - private final InetAddress ipcAddress; + private static final int JM_LOOKUP_REPLY_ID = 1; /** - * The network port that is announced for the job manager's IPC service. + * ID for task manager address request packets. */ - private final int ipcPort; + private static final int TM_ADDRESS_REQUEST_ID = 2; /** - * The thread executing the receive operation on the discovery port. + * ID for task manager address reply packets. */ - private Thread listeningThread = null; + private static final int TM_ADDRESS_REPLY_ID = 3; /** - * The datagram socket of the discovery server. + * The default size of response datagram packets. */ - private DatagramSocket serverSocket = null; + private static final int RESPONSE_PACKET_SIZE = 64; /** - * Flag indicating whether to use IPv6 or not. + * The offset inside a packet to the magic number field. */ - private static final boolean USE_IPV6 = "true".equals(System.getProperty("java.net.preferIPv4Stack")) ? false - : true; + private static final int MAGIC_NUMBER_OFFSET = 0; /** - * ID for job manager lookup request packets. + * The offset inside a packet to the packet ID field. */ - private static final int JM_LOOKUP_REQUEST_ID = 0; + private static final int PACKET_ID_OFFSET = 4; /** - * ID for job manager lookup reply packets. + * The offset inside a packet to the packet type ID field. */ - private static final int JM_LOOKUP_REPLY_ID = 1; + private static final int PACKET_TYPE_ID_OFFSET = 8; /** - * ID for task manager address request packets. + * The offset inside a packet to the actual payload. */ - private static final int TM_ADDRESS_REQUEST_ID = 2; + private static final int PAYLOAD_OFFSET = 12; /** - * ID for task manager address reply packets. + * The log object used for debugging. */ - private static final int TM_ADDRESS_REPLY_ID = 3; + private static final Log LOG = LogFactory.getLog(DiscoveryService.class); /** - * The default size of response datagram packets. + * Singleton instance of the discovery service. */ - private static final int RESPONSE_PACKET_SIZE = 64; + private static DiscoveryService discoveryService = null; + + /** + * The network address the IPC is bound to, possibly null. + */ + private final InetAddress ipcAddress; + + /** + * The network port that is announced for the job manager's IPC service. + */ + private final int ipcPort; + + /** + * The thread executing the receive operation on the discovery port. + */ + private Thread listeningThread = null; + + /** + * The datagram socket of the discovery server. + */ + private DatagramSocket serverSocket = null; + + /** + * Flag to check whether the service is running + */ + private volatile boolean isRunning = false; /** * Constructs a new {@link DiscoveryService} object and stores @@ -174,17 +202,32 @@ public static synchronized void startDiscoveryService(final InetAddress ipcAddre if (discoveryService == null) { discoveryService = new DiscoveryService(ipcAddress, ipcPort); + } + + if (!discoveryService.isRunning()) { discoveryService.startService(); } } + /** + * Checks whether the discovery service is running. + * + * @return true if the service is running, false otherwise + */ + public boolean isRunning() { + + return this.isRunning; + } + /** * Stops the discovery service. */ public static synchronized void stopDiscoveryService() { if (discoveryService != null) { - discoveryService.stopService(); + if (discoveryService.isRunning()) { + discoveryService.stopService(); + } } } @@ -204,7 +247,9 @@ private void startService() throws DiscoveryException { throw new DiscoveryException(e.toString()); } - LOG.debug("Discovery service socket is bound to " + this.serverSocket.getLocalSocketAddress()); + LOG.info("Discovery service socket is bound to " + this.serverSocket.getLocalSocketAddress()); + + this.isRunning = true; this.listeningThread = new Thread(this); this.listeningThread.start(); @@ -217,6 +262,8 @@ private void stopService() { LOG.debug("Stopping discovery service on port" + DISCOVERYPORT); + this.isRunning = false; + this.listeningThread.interrupt(); // Close the server socket @@ -231,9 +278,10 @@ private void stopService() { private static DatagramPacket createJobManagerLookupRequestPacket() { final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE); - final byte[] bytes = new byte[8]; - integerToByteArray(magicNumber, 0, bytes); - integerToByteArray(JM_LOOKUP_REQUEST_ID, 4, bytes); + final byte[] bytes = new byte[12]; + integerToByteArray(magicNumber, MAGIC_NUMBER_OFFSET, bytes); + integerToByteArray(generateRandomPacketID(), PACKET_ID_OFFSET, bytes); + integerToByteArray(JM_LOOKUP_REQUEST_ID, PACKET_TYPE_ID_OFFSET, bytes); return new DatagramPacket(bytes, bytes.length); } @@ -241,15 +289,18 @@ private static DatagramPacket createJobManagerLookupRequestPacket() { /** * Creates a new job manager lookup reply packet. * + * @param ipcPort + * the port of the job manager's IPC server * @return a new job manager lookup reply packet */ private static DatagramPacket createJobManagerLookupReplyPacket(final int ipcPort) { final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE); - final byte[] bytes = new byte[12]; - integerToByteArray(magicNumber, 0, bytes); - integerToByteArray(JM_LOOKUP_REPLY_ID, 4, bytes); - integerToByteArray(ipcPort, 8, bytes); + final byte[] bytes = new byte[16]; + integerToByteArray(magicNumber, MAGIC_NUMBER_OFFSET, bytes); + integerToByteArray(generateRandomPacketID(), PACKET_ID_OFFSET, bytes); + integerToByteArray(JM_LOOKUP_REPLY_ID, PACKET_TYPE_ID_OFFSET, bytes); + integerToByteArray(ipcPort, PAYLOAD_OFFSET, bytes); return new DatagramPacket(bytes, bytes.length); } @@ -262,9 +313,10 @@ private static DatagramPacket createJobManagerLookupReplyPacket(final int ipcPor private static DatagramPacket createTaskManagerAddressRequestPacket() { final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE); - final byte[] bytes = new byte[8]; - integerToByteArray(magicNumber, 0, bytes); - integerToByteArray(TM_ADDRESS_REQUEST_ID, 4, bytes); + final byte[] bytes = new byte[12]; + integerToByteArray(magicNumber, MAGIC_NUMBER_OFFSET, bytes); + integerToByteArray(generateRandomPacketID(), PACKET_ID_OFFSET, bytes); + integerToByteArray(TM_ADDRESS_REQUEST_ID, PACKET_TYPE_ID_OFFSET, bytes); return new DatagramPacket(bytes, bytes.length); } @@ -280,11 +332,12 @@ private static DatagramPacket createTaskManagerAddressReplyPacket(final InetAddr final byte[] addr = taskManagerAddress.getAddress(); final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE); - final byte[] bytes = new byte[12 + addr.length]; - integerToByteArray(magicNumber, 0, bytes); - integerToByteArray(TM_ADDRESS_REPLY_ID, 4, bytes); - integerToByteArray(addr.length, 8, bytes); - System.arraycopy(addr, 0, bytes, 12, addr.length); + final byte[] bytes = new byte[20 + addr.length]; + integerToByteArray(magicNumber, MAGIC_NUMBER_OFFSET, bytes); + integerToByteArray(generateRandomPacketID(), PACKET_ID_OFFSET, bytes); + integerToByteArray(TM_ADDRESS_REPLY_ID, PACKET_TYPE_ID_OFFSET, bytes); + integerToByteArray(addr.length, PAYLOAD_OFFSET, bytes); + System.arraycopy(addr, 0, bytes, PAYLOAD_OFFSET + 4, addr.length); return new DatagramPacket(bytes, bytes.length); } @@ -321,12 +374,13 @@ public static InetAddress getTaskManagerAddress(final InetAddress jobManagerAddr addressRequest.setPort(DISCOVERYPORT); LOG.debug("Sending Task Manager address request to " + addressRequest.getSocketAddress()); + System.out.println("Sending Task Manager address request to " + addressRequest.getSocketAddress()); socket.send(addressRequest); try { socket.receive(responsePacket); } catch (SocketTimeoutException ste) { - LOG.warn("Timeout wainting for address reply. Retrying..."); + LOG.warn("Timeout wainting for task manager address reply. Retrying..."); continue; } @@ -393,8 +447,9 @@ public static InetSocketAddress getJobManagerAddress() throws DiscoveryException for (int retries = 0; retries < DISCOVERFAILURERETRIES; retries++) { + final DatagramPacket lookupRequest = createJobManagerLookupRequestPacket(); + for (InetAddress broadcast : targetAddresses) { - final DatagramPacket lookupRequest = createJobManagerLookupRequestPacket(); lookupRequest.setAddress(broadcast); lookupRequest.setPort(DISCOVERYPORT); LOG.debug("Sending discovery request to " + lookupRequest.getSocketAddress()); @@ -415,15 +470,11 @@ public static InetSocketAddress getJobManagerAddress() throws DiscoveryException final int packetTypeID = getPacketTypeID(responsePacket); if (packetTypeID != JM_LOOKUP_REPLY_ID) { - LOG.error("Received unexpected packet type " + packetTypeID + ", discarding... "); + LOG.debug("Received unexpected packet type " + packetTypeID + ", discarding... "); continue; } final int ipcPort = extractIpcPort(responsePacket); - // TODO: This condition helps to deal with legacy implementations of the DiscoveryService - if (ipcPort < 0) { - continue; - } // Replace port from discovery service with the actual RPC port // of the job manager @@ -479,11 +530,11 @@ private static int extractIpcPort(DatagramPacket packet) { return -1; } - if (packet.getLength() < 12) { + if (packet.getLength() < (PAYLOAD_OFFSET + 4)) { return -1; } - return byteArrayToInteger(data, 8); + return byteArrayToInteger(data, PAYLOAD_OFFSET); } /** @@ -502,14 +553,14 @@ private static InetAddress extractInetAddress(DatagramPacket packet) { return null; } - if (packet.getLength() < 16) { + if (packet.getLength() < PAYLOAD_OFFSET + 8) { return null; } - final int len = byteArrayToInteger(data, 8); + final int len = byteArrayToInteger(data, PAYLOAD_OFFSET); final byte[] addr = new byte[len]; - System.arraycopy(data, 12, addr, 0, len); + System.arraycopy(data, PAYLOAD_OFFSET + 4, addr, 0, len); InetAddress inetAddress = null; @@ -523,9 +574,9 @@ private static InetAddress extractInetAddress(DatagramPacket packet) { } /** - * Returns the set of broadcast addresses available to the network - * interfaces of this host. In case of IPv6 the set contains the - * IPv6 multicast address to reach all nodes on the local link. + * Returns the set of broadcast addresses available to the network interfaces of this host. In case of IPv6 the set + * contains the IPv6 multicast address to reach all nodes on the local link. Moreover, all addresses of the loopback + * interfaces are added to the set. * * @return (possibly empty) set of broadcast addresses reachable by this host */ @@ -549,27 +600,35 @@ private static Set getBroadcastAddresses() { continue; } - // check all IPs bound to network interfaces - for (InterfaceAddress adr : nic.getInterfaceAddresses()) { + if (nic.isLoopback()) { + for (InterfaceAddress adr : nic.getInterfaceAddresses()) { + broadcastAddresses.add(adr.getAddress()); + } + } else { - // collect all broadcast addresses - if (USE_IPV6) { - try { - final InetAddress interfaceAddress = adr.getAddress(); - if (interfaceAddress instanceof Inet6Address) { - final Inet6Address ipv6Address = (Inet6Address) interfaceAddress; - final InetAddress multicastAddress = InetAddress.getByName(IPV6MULTICASTADDRESS + "%" - + Integer.toString(ipv6Address.getScopeId())); - broadcastAddresses.add(multicastAddress); + // check all IPs bound to network interfaces + for (InterfaceAddress adr : nic.getInterfaceAddresses()) { + + // collect all broadcast addresses + if (USE_IPV6) { + try { + final InetAddress interfaceAddress = adr.getAddress(); + if (interfaceAddress instanceof Inet6Address) { + final Inet6Address ipv6Address = (Inet6Address) interfaceAddress; + final InetAddress multicastAddress = InetAddress.getByName(IPV6MULTICASTADDRESS + + "%" + + Integer.toString(ipv6Address.getScopeId())); + broadcastAddresses.add(multicastAddress); + } + + } catch (UnknownHostException e) { + LOG.error(e); + } + } else { + final InetAddress broadcast = adr.getBroadcast(); + if (broadcast != null) { + broadcastAddresses.add(broadcast); } - - } catch (UnknownHostException e) { - LOG.error(e); - } - } else { - final InetAddress broadcast = adr.getBroadcast(); - if (broadcast != null) { - broadcastAddresses.add(broadcast); } } } @@ -590,16 +649,42 @@ public void run() { final DatagramPacket requestPacket = new DatagramPacket(new byte[64], 64); - while (!Thread.interrupted()) { + final Map packetIDMap = new HashMap(); + + while (this.isRunning) { try { this.serverSocket.receive(requestPacket); - + + System.out.println("PACKET RECEIVED"); + if (!isPacketForUs(requestPacket)) { LOG.debug("Received request packet which is not destined to this Nephele setup"); continue; } + final Integer packetID = Integer.valueOf(extractPacketID(requestPacket)); + if(packetIDMap.containsKey(packetID)) { + LOG.debug("Request with ID " + packetID.intValue() + " already answered, discarding..."); + continue; + } else { + + final long currentTime = System.currentTimeMillis(); + + //Remove old entries + final Iterator> it = packetIDMap.entrySet().iterator(); + while(it.hasNext()) { + + final Map.Entry entry = it.next(); + if((entry.getValue().longValue() + 5000L) < currentTime) { + it.remove(); + } + } + + packetIDMap.put(packetID, Long.valueOf(currentTime)); + } + + final int packetTypeID = getPacketTypeID(requestPacket); if (packetTypeID == JM_LOOKUP_REQUEST_ID) { @@ -620,13 +705,15 @@ public void run() { this.serverSocket.send(responsePacket); } else { - LOG.error("Received packet of unknown type " + packetTypeID + ", discarding..."); + LOG.debug("Received packet of unknown type " + packetTypeID + ", discarding..."); } } catch (SocketTimeoutException ste) { LOG.debug("Discovery service: socket timeout"); } catch (IOException ioe) { - LOG.error("Discovery service stopped working with IOException:\n" + ioe.toString()); + if (this.isRunning) { // Ignore exception when service has been stopped + LOG.error("Discovery service stopped working with IOException:\n" + ioe.toString()); + } break; } } @@ -666,10 +753,6 @@ private static int byteArrayToInteger(final byte[] byteArray, final int offset) int integer = 0; - for (int i = 0; i < 4; ++i) { - System.out.print(byteArray[i]); - } - for (int i = 0; i < 4; ++i) { integer |= (byteArray[(offset + 3) - i] & 0xff) << (i << 3); } @@ -693,11 +776,12 @@ private static boolean isPacketForUs(final DatagramPacket packet) { return false; } - if (packet.getLength() < 4) { + if (packet.getLength() < (MAGIC_NUMBER_OFFSET + 4)) { return false; } - if (byteArrayToInteger(data, 0) != GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE)) { + if (byteArrayToInteger(data, MAGIC_NUMBER_OFFSET) != GlobalConfiguration.getInteger(MAGICNUMBER_KEY, + DEFAULT_MAGICNUMBER_VALUE)) { return false; } @@ -719,10 +803,42 @@ private static int getPacketTypeID(final DatagramPacket packet) { return -1; } - if (packet.getLength() < 8) { + if (packet.getLength() < (PACKET_TYPE_ID_OFFSET + 4)) { + return -1; + } + + return byteArrayToInteger(data, PACKET_TYPE_ID_OFFSET); + } + + /** + * Generates a random packet ID. + * + * @return a random packet ID + */ + private static int generateRandomPacketID() { + + return (int) (Math.random() * (double) Integer.MAX_VALUE); + } + + /** + * Extracts the packet ID from the given packet. + * + * @param packet + * the packet to extract the ID from + * @return the extracted ID or -1 if the ID could not be extracted + */ + private static int extractPacketID(final DatagramPacket packet) { + + final byte[] data = packet.getData(); + + if (data == null) { + return -1; + } + + if (data.length < (PACKET_ID_OFFSET + 4)) { return -1; } - return byteArrayToInteger(data, 4); + return byteArrayToInteger(data, PACKET_ID_OFFSET); } } diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java index 3741b2655c6f5..a34eb45dffbb5 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java @@ -238,7 +238,7 @@ public JobManager(String configDir, String executionMode) { LOG.error("Cannot find class name for the profiler"); System.exit(FAILURERETURNCODE); } - this.profiler = ProfilingUtils.loadJobManagerProfiler(profilerClassName); + this.profiler = ProfilingUtils.loadJobManagerProfiler(profilerClassName, ipcAddress); if (this.profiler == null) { LOG.error("Cannot load profiler"); System.exit(FAILURERETURNCODE); diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/profiling/ProfilingUtils.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/profiling/ProfilingUtils.java index 57f2e401b1f82..e1823aae9660e 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/profiling/ProfilingUtils.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/profiling/ProfilingUtils.java @@ -83,10 +83,12 @@ public class ProfilingUtils { * * @param profilerClassName * the class name of the profiling component to load + * @param jobManagerBindAddress + * the address the job manager's RPC server is bound to * @return an instance of the job manager profiling component or null if an error occurs */ @SuppressWarnings("unchecked") - public static JobManagerProfiler loadJobManagerProfiler(String profilerClassName) { + public static JobManagerProfiler loadJobManagerProfiler(String profilerClassName, InetAddress jobManagerBindAddress) { final Class profilerClass; try { @@ -97,9 +99,18 @@ public static JobManagerProfiler loadJobManagerProfiler(String profilerClassName } JobManagerProfiler profiler = null; - + try { - profiler = profilerClass.newInstance(); + + final Constructor constr = (Constructor) profilerClass.getConstructor(InetAddress.class); + profiler = constr.newInstance(jobManagerBindAddress); + + } catch(InvocationTargetException e) { + LOG.error("Cannot create profiler: " + StringUtils.stringifyException(e)); + return null; + } catch (NoSuchMethodException e) { + LOG.error("Cannot create profiler: " + StringUtils.stringifyException(e)); + return null; } catch (InstantiationException e) { LOG.error("Cannot create profiler: " + StringUtils.stringifyException(e)); return null; diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index dd7d446ddb021..c95fb75b4ba0a 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -163,22 +164,31 @@ public TaskManager(String configDir) throws Exception { GlobalConfiguration.loadConfiguration(configDir); // Use discovery service to find the job manager in the network? + final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,null); InetSocketAddress jobManagerAddress = null; - if (GlobalConfiguration.getBoolean(ConfigConstants.TASK_MANAGER_USE_DISCOVERY_KEY, true)) { + if(address == null) { + // Address is null, use discovery manager to determine address + LOG.info("Using discovery service to locate job manager"); try { jobManagerAddress = DiscoveryService.getJobManagerAddress(); } catch (DiscoveryException e) { - throw new Exception("Failed to initialize discovery service. " + e.getMessage(), e); + throw new Exception("Failed to locate job manager via discovery: " + e.getMessage(), e); } - } else { - final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_ADDRESS); + LOG.info("Reading location of job manager from configuration"); + final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); - jobManagerAddress = new InetSocketAddress(address, port); + + // Try to convert configured address to {@link InetAddress} + try { + final InetAddress tmpAddress = InetAddress.getByName(address); + jobManagerAddress = new InetSocketAddress(tmpAddress, port); + } catch(UnknownHostException e) { + throw new Exception("Failed to locate job manager based on configuration: " + e.getMessage(), e); + } } - + LOG.info("Determined address of job manager to be " + jobManagerAddress); // Determine interface address that is announced to the job manager diff --git a/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/discovery/DiscoveryServiceTest.java b/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/discovery/DiscoveryServiceTest.java index e4d8af5795106..8a42f506796b2 100644 --- a/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/discovery/DiscoveryServiceTest.java +++ b/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/discovery/DiscoveryServiceTest.java @@ -15,21 +15,100 @@ package eu.stratosphere.nephele.discovery; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + /** * This class contains tests for the {@link DiscoveryService} class. * * @author warneke - * */ public class DiscoveryServiceTest { -/* @BeforeClass - public static void startService() { - + /** + * The dummy IPC port used during the tests. + */ + private static final int IPC_PORT = 5555; + + /** + * Starts the discovery service before the tests. + * + * @throws UnknownHostException + * thrown if the {@link InetAddress} of localhost cannot be determined + * @throws DiscoveryException + * thrown if an error occurs during the start of the discovery manager + */ + @BeforeClass + public static void startService() throws UnknownHostException, DiscoveryException { + + final InetAddress bindAddress = InetAddress.getLocalHost(); + + DiscoveryService.startDiscoveryService(bindAddress, IPC_PORT); } - @AfterClass - public static void startService() { + /** + * Tests the job manager discovery function. + */ + @Test + public void testJobManagerDiscovery() { + + InetAddress localHost = null; - }*/ + try { + localHost = InetAddress.getLocalHost(); + } catch(UnknownHostException e) { + fail(e.getMessage()); + } + + try { + final InetSocketAddress jobManagerAddress = DiscoveryService.getJobManagerAddress(); + + assertEquals(localHost, jobManagerAddress.getAddress()); + assertEquals(IPC_PORT, jobManagerAddress.getPort()); + + } catch(DiscoveryException e) { + fail(e.getMessage()); + } + } + + /** + * Tests if the task manager address resolution works properly. + */ + @Test + public void testTaskManagerAddressResolution() { + + InetAddress localHost = null; + + try { + localHost = InetAddress.getLocalHost(); + } catch(UnknownHostException e) { + fail(e.getMessage()); + } + + try { + final InetAddress taskManagerAddress = DiscoveryService.getTaskManagerAddress(localHost); + + assertEquals(localHost, taskManagerAddress); + + } catch(DiscoveryException e) { + fail(e.getMessage()); + } + } + + /** + * Shuts the discovery service down after the tests. + */ + @AfterClass + public static void stopService() { + + DiscoveryService.stopDiscoveryService(); + } } diff --git a/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java b/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java index e5311928e36eb..5fa0f359c34b6 100644 --- a/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java +++ b/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java @@ -17,9 +17,17 @@ import static org.junit.Assert.*; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import eu.stratosphere.nephele.configuration.ConfigConstants; import eu.stratosphere.nephele.configuration.GlobalConfiguration; +import eu.stratosphere.nephele.discovery.DiscoveryException; +import eu.stratosphere.nephele.discovery.DiscoveryService; import eu.stratosphere.nephele.instance.InstanceType; /** @@ -39,6 +47,42 @@ public class LocalInstanceManagerTest { */ private static final String CORRECT_CONF_DIR = "/correct-conf"; + /** + * Starts the discovery service before the tests. + */ + @BeforeClass + public static void startDiscoveryService() { + + final String configDir = System.getProperty(USER_DIR_KEY) + CORRECT_CONF_DIR; + + GlobalConfiguration.loadConfiguration(configDir); + + final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + InetAddress bindAddress = null; + if(address != null) { + try { + bindAddress = InetAddress.getByName(address); + } catch(UnknownHostException e) { + fail(e.getMessage()); + } + } + + try { + DiscoveryService.startDiscoveryService(bindAddress, 5555); + } catch(DiscoveryException e) { + fail(e.getMessage()); + } + } + + /** + * Stops the discovery service after the tests. + */ + @AfterClass + public static void stopDiscoveryService() { + + DiscoveryService.stopDiscoveryService(); + } + /** * Checks if the local instance manager reads the default correctly from the configuration file. */ @@ -46,8 +90,7 @@ public class LocalInstanceManagerTest { public void testInstanceTypeFromConfiguration() { final String configDir = System.getProperty(USER_DIR_KEY) + CORRECT_CONF_DIR; - - GlobalConfiguration.loadConfiguration(configDir); + final TestInstanceListener testInstanceListener = new TestInstanceListener(); LocalInstanceManager lm = null; diff --git a/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java index bcc8fe436af20..67eb390ca604a 100644 --- a/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java +++ b/nephele/nephele-server/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java @@ -31,6 +31,9 @@ import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.client.JobExecutionException; +import eu.stratosphere.nephele.configuration.ConfigConstants; +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.nephele.configuration.GlobalConfiguration; import eu.stratosphere.nephele.fs.Path; import eu.stratosphere.nephele.io.channels.ChannelType; import eu.stratosphere.nephele.io.compression.CompressionLevel; @@ -54,6 +57,8 @@ public class JobManagerITCase { private static JobManagerThread jobManagerThread = null; + private static Configuration configuration; + /** * This is an auxiliary class to run the job manager thread. * @@ -135,6 +140,9 @@ public static void startNephele() { fail(e.getMessage()); } + configuration = GlobalConfiguration + .getConfiguration(new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY }); + // Start job manager thread if (jobManager != null) { jobManagerThread = new JobManagerThread(jobManager); @@ -233,7 +241,7 @@ public void testExecutionWithException() { jg.addJar(new Path("file://" + ServerTestUtils.getTempDir() + File.separator + exceptionClassName + ".jar")); // Create job client and launch job - final JobClient jobClient = new JobClient(jg); + final JobClient jobClient = new JobClient(jg, configuration); try { jobClient.submitJobAndWait(); @@ -317,7 +325,7 @@ public void testExecutionWithRuntimeException() { + ".jar")); // Create job client and launch job - final JobClient jobClient = new JobClient(jg); + final JobClient jobClient = new JobClient(jg, configuration); try { jobClient.submitJobAndWait(); @@ -416,7 +424,7 @@ private void test(int limit) { jg.addJar(new Path("file://" + ServerTestUtils.getTempDir() + File.separator + forwardClassName + ".jar")); // Create job client and launch job - JobClient jobClient = new JobClient(jg); + JobClient jobClient = new JobClient(jg, configuration); try { jobClient.submitJobAndWait(); } catch (JobExecutionException e) { diff --git a/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/web/WebInterfaceServer.java b/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/web/WebInterfaceServer.java index fc74826c2e96b..df43f22bd9093 100644 --- a/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/web/WebInterfaceServer.java +++ b/pact/pact-clients/src/main/java/eu/stratosphere/pact/client/web/WebInterfaceServer.java @@ -137,8 +137,7 @@ public WebInterfaceServer(Configuration nepheleConfig, int port) + planDumpDir.getAbsolutePath() + "'."); LOG.debug("Web-frontend will submit jobs to nephele job-manager on " - + config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_ADDRESS) + ", port " + + config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ", port " + config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + "."); diff --git a/stratosphere-dist/src/main/stratosphere-bin/conf/nephele-user.xml b/stratosphere-dist/src/main/stratosphere-bin/conf/nephele-user.xml index 0395c61546386..1f863300ca148 100644 --- a/stratosphere-dist/src/main/stratosphere-bin/conf/nephele-user.xml +++ b/stratosphere-dist/src/main/stratosphere-bin/conf/nephele-user.xml @@ -17,29 +17,6 @@ --> - - - - - jobmanager.rpc.address - localhost - - - - - jobmanager.rpc.port - 6123 - - - - - taskmanager.rpc.port - 6122 - - - From f2d758b2693347701e6cc67b1316825dbe2a8af6 Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Sun, 27 Feb 2011 16:57:38 +0100 Subject: [PATCH 3/8] Fixed potential source for deadlocks in case of task errors --- .../nephele/taskmanager/TaskManager.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index c95fb75b4ba0a..e2d14b06e02bf 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -522,7 +523,7 @@ public TaskCancelResult cancelTask(ExecutionVertexID id) throws IOException { final Environment environment = tmpEnvironment; // Execute call in a new thread so IPC thread can return immediately - Thread tmpThread = new Thread(new Runnable() { + final Thread tmpThread = new Thread(new Runnable() { @Override public void run() { @@ -854,6 +855,8 @@ public synchronized boolean isShutDown() { */ private void checkTaskExecution() { + final List crashEnvironments = new LinkedList(); + synchronized (this.runningTasks) { final Iterator it = this.runningTasks.keySet().iterator(); @@ -864,10 +867,16 @@ private void checkTaskExecution() { if (environment.getExecutingThread().getState() == Thread.State.TERMINATED) { // Remove entry from the running tasks map it.remove(); - environment.changeExecutionState(ExecutionState.FAILED, "Execution thread died unexpectedly"); + //Don't to IPC call while holding a lock on the runningTasks map + crashEnvironments.add(environment); } } } + + final Iterator it2 = crashEnvironments.iterator(); + while(it2.hasNext()) { + it2.next().changeExecutionState(ExecutionState.FAILED, "Execution thread died unexpectedly"); + } } /** From 54ca0e042ae0d2f058911da1e6d9905d576f5224 Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Sun, 27 Feb 2011 16:58:34 +0100 Subject: [PATCH 4/8] Minor modification to generation of random filenames --- .../src/main/java/eu/stratosphere/nephele/util/FileUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/util/FileUtils.java b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/util/FileUtils.java index 88f52ece1038e..bc88ceef4f299 100644 --- a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/util/FileUtils.java +++ b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/util/FileUtils.java @@ -46,7 +46,7 @@ public static String getRandomFilename(String prefix) { final StringBuilder stringBuilder = new StringBuilder(prefix); for (int i = 0; i < LENGTH; i++) { - stringBuilder.append(ALPHABET[(int) Math.floor(Math.random() * ALPHABET.length)]); + stringBuilder.append(ALPHABET[(int) Math.floor(Math.random() * (double)ALPHABET.length)]); } return stringBuilder.toString(); From c8817968e91de284e64092d6b57ec2ee1cf34eeb Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Sun, 27 Feb 2011 20:57:14 +0100 Subject: [PATCH 5/8] Fixed bug in equals method of IncomingConnectionID --- .../bytebuffered/IncomingConnectionID.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionID.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionID.java index 0d8fb33b2f007..4d942e625defe 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionID.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionID.java @@ -75,10 +75,16 @@ public IncomingConnectionID(String fileName) { @Override public boolean equals(Object obj) { + if (!(obj instanceof IncomingConnectionID)) { + return false; + } + + final IncomingConnectionID ici = (IncomingConnectionID) obj; + if (this.inetAddress != null) { - return this.inetAddress.equals(obj); + return this.inetAddress.equals(ici.inetAddress); } else { - return this.fileName.equals(obj); + return this.fileName.equals(ici.fileName); } } @@ -94,13 +100,13 @@ public int hashCode() { return this.fileName.hashCode(); } } - + /** * {@inheritDoc} */ @Override public String toString() { - + if (this.inetAddress != null) { return this.inetAddress.toString(); } else { From 5128882a6ca5ed40c3b24a077a338712e0fcf1e8 Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Sun, 27 Feb 2011 21:11:24 +0100 Subject: [PATCH 6/8] Removed debugging output from DiscoveryService --- .../nephele/discovery/DiscoveryService.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java index b69dba0bc33d4..f3e6699ca287b 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/discovery/DiscoveryService.java @@ -203,7 +203,7 @@ public static synchronized void startDiscoveryService(final InetAddress ipcAddre if (discoveryService == null) { discoveryService = new DiscoveryService(ipcAddress, ipcPort); } - + if (!discoveryService.isRunning()) { discoveryService.startService(); } @@ -374,7 +374,6 @@ public static InetAddress getTaskManagerAddress(final InetAddress jobManagerAddr addressRequest.setPort(DISCOVERYPORT); LOG.debug("Sending Task Manager address request to " + addressRequest.getSocketAddress()); - System.out.println("Sending Task Manager address request to " + addressRequest.getSocketAddress()); socket.send(addressRequest); try { @@ -655,36 +654,33 @@ public void run() { try { this.serverSocket.receive(requestPacket); - - System.out.println("PACKET RECEIVED"); - + if (!isPacketForUs(requestPacket)) { LOG.debug("Received request packet which is not destined to this Nephele setup"); continue; } final Integer packetID = Integer.valueOf(extractPacketID(requestPacket)); - if(packetIDMap.containsKey(packetID)) { + if (packetIDMap.containsKey(packetID)) { LOG.debug("Request with ID " + packetID.intValue() + " already answered, discarding..."); continue; } else { - + final long currentTime = System.currentTimeMillis(); - - //Remove old entries + + // Remove old entries final Iterator> it = packetIDMap.entrySet().iterator(); - while(it.hasNext()) { - + while (it.hasNext()) { + final Map.Entry entry = it.next(); - if((entry.getValue().longValue() + 5000L) < currentTime) { + if ((entry.getValue().longValue() + 5000L) < currentTime) { it.remove(); } } - + packetIDMap.put(packetID, Long.valueOf(currentTime)); } - - + final int packetTypeID = getPacketTypeID(requestPacket); if (packetTypeID == JM_LOOKUP_REQUEST_ID) { From 7715a537d8ac42ac5732164c3b81eb41ce3524bd Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Sun, 27 Feb 2011 21:12:47 +0100 Subject: [PATCH 7/8] OutgoingConnections are no longer closed immediately but after being idle for 3 sec. --- .../bytebuffered/OutgoingConnection.java | 74 ++++++++++++++++++- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java index 2ff42f12dd37f..3eadb3090848c 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java @@ -89,11 +89,56 @@ public class OutgoingConnection { */ private long timstampOfLastRetry = 0; + /** + * The timestamp of the last data transfer. + */ + private long timestampOfLastTransfer = 0; + + /** + * The time at which the connection has been closed. + */ + private long closeTimestamp = 0; + /** * The period of time in milliseconds that shall be waited before a connection attempt is considered to be failed. */ private static long RETRYINTERVAL = 1000L; // 1 second + /** + * The minimum time the connection must be idle before the underlying TCP connection is closed + */ + private static final long MIN_IDLE_TIME_BEFORE_CLOSE = 3000L; // 3 seconds + + /** + * The minimum time that is waited before a connection is reestablished after it has been closed + */ + private static final long MIN_TIME_BEFORE_CONNECTION = 3000L; // 3 seconds + + private static class ConnectionLauncher extends Thread { + + private final OutgoingConnection outgoingConnection; + + private final long sleepTime; + + private ConnectionLauncher(OutgoingConnection outgoingConnection, long sleepTime) { + + this.outgoingConnection = outgoingConnection; + this.sleepTime = sleepTime; + } + + @Override + public void run() { + + System.out.println("Sleeping for " + this.sleepTime); + try { + Thread.sleep(this.sleepTime); + } catch (InterruptedException e) { + } + + this.outgoingConnection.triggerConnection(); + } + } + /** * Constructs a new outgoing connection object. * @@ -129,15 +174,33 @@ public void queueEnvelope(TransferEnvelope transferEnvelope) { synchronized (this.queuedEnvelopes) { + if (!this.isConnected) { + + final long now = System.currentTimeMillis(); + if (this.closeTimestamp + MIN_TIME_BEFORE_CONNECTION > now) { + // Trigger connection delayed + new ConnectionLauncher(this, (this.closeTimestamp + MIN_TIME_BEFORE_CONNECTION) - now).start(); + } else { + // Trigger connection immediately + triggerConnection(); + } + } + + this.queuedEnvelopes.add(transferEnvelope); + } + } + + private void triggerConnection() { + + synchronized (this.queuedEnvelopes) { if (!this.isConnected) { this.retriesLeft = this.numberOfConnectionRetries; this.timstampOfLastRetry = System.currentTimeMillis(); this.connectionThread.triggerConnect(this); this.isConnected = true; } - - this.queuedEnvelopes.add(transferEnvelope); } + } /** @@ -351,6 +414,7 @@ public boolean write(WritableByteChannel writableByteChannel) throws IOException synchronized (this.queuedEnvelopes) { this.queuedEnvelopes.poll(); this.currentEnvelope = null; + this.timestampOfLastTransfer = System.currentTimeMillis(); } } @@ -372,12 +436,18 @@ public boolean write(WritableByteChannel writableByteChannel) throws IOException */ public void closeConnection(SocketChannel socketChannel, SelectionKey key) throws IOException { + if ((this.timestampOfLastTransfer + MIN_IDLE_TIME_BEFORE_CLOSE) > System.currentTimeMillis()) { + return; + } + synchronized (this.queuedEnvelopes) { if (this.queuedEnvelopes.isEmpty()) { + LOG.debug("Closing connection to " + socketChannel.socket().getRemoteSocketAddress()); socketChannel.close(); key.cancel(); this.isConnected = false; + this.closeTimestamp = System.currentTimeMillis(); } } } From aff72d3374b14258abe79f41fa3bc2e7e1dffcee Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Sun, 27 Feb 2011 21:14:35 +0100 Subject: [PATCH 8/8] Simplified state model of file buffer manager --- .../nephele/io/channels/FileBuffer.java | 8 +- .../io/channels/FileBufferManager.java | 116 +++++++----------- 2 files changed, 49 insertions(+), 75 deletions(-) diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBuffer.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBuffer.java index 50d45a7b3086e..7bb496311f15b 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBuffer.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBuffer.java @@ -86,10 +86,6 @@ public int read(ByteBuffer dst) throws IOException { return -1; } - if (this.totalBytesRead >= this.bufferSize) { - return -1; - } - final int rem = remaining(); int bytesRead; if (dst.remaining() > rem) { @@ -101,6 +97,10 @@ public int read(ByteBuffer dst) throws IOException { bytesRead = this.fileChannel.read(dst); } + if (bytesRead < 0) { + return -1; + } + this.totalBytesRead += bytesRead; return bytesRead; diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBufferManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBufferManager.java index b91b75d34f702..09fd33531a972 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBufferManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBufferManager.java @@ -54,6 +54,10 @@ public class FileBufferManager { private final String tmpDir; + private static enum FileEntryStatus { + CLOSED, WRITING, WRITING_BUT_READ_REQUESTED + }; + /** * Objects of this class store management information of each channel * pair that uses file buffers. @@ -64,17 +68,7 @@ public class FileBufferManager { */ private class FileBufferManagerEntry { - /** - * Stores whether the reading thread is currently waiting - * for an input file to be closed. - */ - private boolean readAttemptInProgress = false; - - /** - * Stores if there is currently a writing thread which - * could write the input file. - */ - private boolean writeAttemptInProgress = false; + private FileEntryStatus status = FileEntryStatus.CLOSED; /** * Stores whether the data written to disk by this @@ -83,16 +77,8 @@ private class FileBufferManagerEntry { */ private final boolean isTemporaryFile; - /** - * The channel which is currently used to - * read from an output file. - */ private FileChannel fileChannelForReading = null; - /** - * The channel which is currently used to - * write to the input file. - */ private FileChannel fileChannelForWriting = null; /** @@ -104,11 +90,8 @@ private class FileBufferManagerEntry { /** * A list of output files ready to be read. */ - private Deque filesForReading = new ArrayDeque(); + private final Deque filesForReading = new ArrayDeque(); - /** - * The file which is used as the current input file. - */ private File currentFileForWriting = null; /** @@ -152,19 +135,15 @@ private synchronized FileChannel getFileChannelForReading() throws IOException { return this.fileChannelForReading; } - this.readAttemptInProgress = true; - try { - while (this.filesForReading.isEmpty()) { + if (this.status == FileEntryStatus.CLOSED) { + closeCurrentWriteFile(); + } - // System.out.println("Size: " + this.filesForReading.size() + ", " + this.writeAttemptInProgress + - // ", " + this.fileChannelForWriting); - if (!this.writeAttemptInProgress && this.fileChannelForWriting != null) { - closeCurrentWriteFile(); - } else { - this.wait(); - } + while (this.filesForReading.isEmpty()) { + this.status = FileEntryStatus.WRITING_BUT_READ_REQUESTED; + this.wait(); } final File file = this.filesForReading.peek(); @@ -173,13 +152,24 @@ private synchronized FileChannel getFileChannelForReading() throws IOException { this.fileChannelForReading = fis.getChannel(); } catch (InterruptedException e) { LOG.error(e); - } finally { - this.readAttemptInProgress = false; } return this.fileChannelForReading; } + private void closeCurrentWriteFile() throws IOException { + + if (this.fileChannelForWriting != null) { + + this.fileChannelForWriting.close(); + this.fileChannelForWriting = null; + + this.filesForReading.add(this.currentFileForWriting); + this.notify(); + this.currentFileForWriting = null; + } + } + /** * Returns the channel the writing thread is supposed to use to * write data to the file. @@ -197,7 +187,7 @@ private synchronized FileChannel getFileChannelForWriting() throws IOException { this.fileChannelForWriting = fos.getChannel(); } - this.writeAttemptInProgress = true; + this.status = FileEntryStatus.WRITING; return this.fileChannelForWriting; } @@ -225,35 +215,17 @@ private synchronized void checkForEndOfFile() throws IOException { } } - /** - * Closes the current input file and adds it to - * the list of files which are ready to be read - * from. - * - * @throws IOException - * thrown if an error occurs while closing the input file - */ - private void closeCurrentWriteFile() throws IOException { - - this.fileChannelForWriting.close(); - this.filesForReading.add(this.currentFileForWriting); - this.currentFileForWriting = null; - this.fileChannelForWriting = null; - notify(); - } - - private synchronized boolean cleanUpPossible() { - - return (this.fileChannelForWriting == null); - } - private synchronized void reportEndOfWritePhase() throws IOException { - if (this.readAttemptInProgress) { + if (this.status == FileEntryStatus.CLOSED) { + throw new IOException("reportEndOfWritePhase is called, but file entry status is CLOSED"); + } + + if (this.status == FileEntryStatus.WRITING_BUT_READ_REQUESTED) { closeCurrentWriteFile(); } - this.writeAttemptInProgress = false; + this.status = FileEntryStatus.CLOSED; } } @@ -319,19 +291,10 @@ public FileChannel getFileChannelForWriting(ChannelID sourceChannelID) throws IO public void reportFileBufferAsConsumed(ChannelID sourceChannelID) { + FileBufferManagerEntry fbme = null; synchronized (this.dataSources) { - FileBufferManagerEntry fbme = this.dataSources.get(sourceChannelID); - if (fbme == null) { - LOG.error("Cannot find data source for channel " + sourceChannelID + " to mark buffer as consumed"); - return; - } - - try { - fbme.checkForEndOfFile(); - } catch (IOException ioe) { - LOG.error(ioe); - } + fbme = this.dataSources.get(sourceChannelID); // Clean up // TODO: Fix this @@ -341,6 +304,17 @@ public void reportFileBufferAsConsumed(ChannelID sourceChannelID) { * } */ } + + if (fbme == null) { + LOG.error("Cannot find data source for channel " + sourceChannelID + " to mark buffer as consumed"); + return; + } + + try { + fbme.checkForEndOfFile(); + } catch (IOException ioe) { + LOG.error(ioe); + } } public void reportEndOfWritePhase(ChannelID sourceChannelID) throws IOException {