Skip to content

Commit

Permalink
Merge branch 'version02' into stage1_version02
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Jun 26, 2012
2 parents b5f2a3f + a44cc54 commit 397bf1d
Showing 1 changed file with 62 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@
*/
public class DiscoveryService implements Runnable {

/**
* Network port of the discovery listens on for incoming connections.
*/
private static final int DISCOVERYPORT = 7001;

/**
* Number of retries before discovery is considered to be failed.
*/
Expand All @@ -83,7 +78,18 @@ public class DiscoveryService implements Runnable {
/**
* The default magic number.
*/
private static final int DEFAULT_MAGICNUMBER_VALUE = 0;
private static final int DEFAULT_MAGICNUMBER = 0;

/**
* The key to retrieve the network port the discovery service listens on for incoming connections from the
* configuration.
*/
private static final String DISCOVERYPORT_KEY = "discoveryservice.port";

/**
* The default network port the discovery service listens on for incoming connections.
*/
private static final int DEFAULT_DISCOVERYPORT = 7001;

/**
* Flag indicating whether to use IPv6 or not.
Expand Down Expand Up @@ -146,6 +152,16 @@ public class DiscoveryService implements Runnable {
*/
private static DiscoveryService discoveryService = null;

/**
* The network port the discovery service listens on for incoming connections.
*/
private final int discoveryPort;

/**
* The magic number used to identify this instance of the discovery service.
*/
private final int magicNumber;

/**
* The network address the IPC is bound to, possibly <code>null</code>.
*/
Expand Down Expand Up @@ -182,6 +198,9 @@ public class DiscoveryService implements Runnable {
*/
private DiscoveryService(final InetAddress ipcAddress, final int ipcPort) {

this.discoveryPort = GlobalConfiguration.getInteger(DISCOVERYPORT_KEY, DEFAULT_DISCOVERYPORT);
this.magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER);

this.ipcAddress = ipcAddress;
this.ipcPort = ipcPort;
}
Expand Down Expand Up @@ -242,7 +261,7 @@ public static synchronized void stopDiscoveryService() {
private void startService() throws DiscoveryException {

try {
this.serverSocket = new DatagramSocket(DISCOVERYPORT, this.ipcAddress);
this.serverSocket = new DatagramSocket(this.discoveryPort, this.ipcAddress);
} catch (SocketException e) {
throw new DiscoveryException(e.toString());
}
Expand All @@ -260,7 +279,9 @@ private void startService() throws DiscoveryException {
*/
private void stopService() {

LOG.debug("Stopping discovery service on port" + DISCOVERYPORT);
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping discovery service on port" + this.discoveryPort);
}

this.isRunning = false;

Expand All @@ -273,11 +294,12 @@ private void stopService() {
/**
* Creates a new job manager lookup request packet.
*
* @param magicNumber
* the magic number to identify this discovery service
* @return a new job manager lookup request packet
*/
private static DatagramPacket createJobManagerLookupRequestPacket() {
private static DatagramPacket createJobManagerLookupRequestPacket(final int magicNumber) {

final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE);
final byte[] bytes = new byte[12];
integerToByteArray(magicNumber, MAGIC_NUMBER_OFFSET, bytes);
integerToByteArray(generateRandomPacketID(), PACKET_ID_OFFSET, bytes);
Expand All @@ -291,11 +313,12 @@ private static DatagramPacket createJobManagerLookupRequestPacket() {
*
* @param ipcPort
* the port of the job manager's IPC server
* @param magicNumber
* the magic number to identify this discovery service
* @return a new job manager lookup reply packet
*/
private static DatagramPacket createJobManagerLookupReplyPacket(final int ipcPort) {
private static DatagramPacket createJobManagerLookupReplyPacket(final int ipcPort, final int magicNumber) {

final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE);
final byte[] bytes = new byte[16];
integerToByteArray(magicNumber, MAGIC_NUMBER_OFFSET, bytes);
integerToByteArray(generateRandomPacketID(), PACKET_ID_OFFSET, bytes);
Expand All @@ -308,11 +331,12 @@ private static DatagramPacket createJobManagerLookupReplyPacket(final int ipcPor
/**
* Creates a new task manager address request packet.
*
* @param magicNumber
* the magic number to identify this discovery service
* @return a new task manager address request packet
*/
private static DatagramPacket createTaskManagerAddressRequestPacket() {
private static DatagramPacket createTaskManagerAddressRequestPacket(final int magicNumber) {

final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE);
final byte[] bytes = new byte[12];
integerToByteArray(magicNumber, MAGIC_NUMBER_OFFSET, bytes);
integerToByteArray(generateRandomPacketID(), PACKET_ID_OFFSET, bytes);
Expand All @@ -326,12 +350,14 @@ private static DatagramPacket createTaskManagerAddressRequestPacket() {
*
* @param taskManagerAddress
* the address of the task manager which sent the request
* @param magicNumber
* the magic number to identify this discovery service
* @return a new task manager address reply packet
*/
private static DatagramPacket createTaskManagerAddressReplyPacket(final InetAddress taskManagerAddress) {
private static DatagramPacket createTaskManagerAddressReplyPacket(final InetAddress taskManagerAddress,
final int magicNumber) {

final byte[] addr = taskManagerAddress.getAddress();
final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER_VALUE);
final byte[] bytes = new byte[20 + addr.length];
integerToByteArray(magicNumber, MAGIC_NUMBER_OFFSET, bytes);
integerToByteArray(generateRandomPacketID(), PACKET_ID_OFFSET, bytes);
Expand All @@ -354,8 +380,10 @@ private static DatagramPacket createTaskManagerAddressReplyPacket(final InetAddr
*/
public static InetAddress getTaskManagerAddress(final InetAddress jobManagerAddress) throws DiscoveryException {

InetAddress taskManagerAddress = null;
final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER);
final int discoveryPort = GlobalConfiguration.getInteger(DISCOVERYPORT_KEY, DEFAULT_DISCOVERYPORT);

InetAddress taskManagerAddress = null;
DatagramSocket socket = null;

try {
Expand All @@ -369,9 +397,9 @@ public static InetAddress getTaskManagerAddress(final InetAddress jobManagerAddr

for (int retries = 0; retries < DISCOVERFAILURERETRIES; retries++) {

final DatagramPacket addressRequest = createTaskManagerAddressRequestPacket();
final DatagramPacket addressRequest = createTaskManagerAddressRequestPacket(magicNumber);
addressRequest.setAddress(jobManagerAddress);
addressRequest.setPort(DISCOVERYPORT);
addressRequest.setPort(discoveryPort);

LOG.debug("Sending Task Manager address request to " + addressRequest.getSocketAddress());
socket.send(addressRequest);
Expand All @@ -383,7 +411,7 @@ public static InetAddress getTaskManagerAddress(final InetAddress jobManagerAddr
continue;
}

if (!isPacketForUs(responsePacket)) {
if (!isPacketForUs(responsePacket, magicNumber)) {
LOG.warn("Received packet which is not destined to this Nephele setup");
continue;
}
Expand Down Expand Up @@ -424,8 +452,10 @@ public static InetAddress getTaskManagerAddress(final InetAddress jobManagerAddr
*/
public static InetSocketAddress getJobManagerAddress() throws DiscoveryException {

InetSocketAddress jobManagerAddress = null;
final int magicNumber = GlobalConfiguration.getInteger(MAGICNUMBER_KEY, DEFAULT_MAGICNUMBER);
final int discoveryPort = GlobalConfiguration.getInteger(DISCOVERYPORT_KEY, DEFAULT_DISCOVERYPORT);

InetSocketAddress jobManagerAddress = null;
DatagramSocket socket = null;

try {
Expand All @@ -446,11 +476,11 @@ public static InetSocketAddress getJobManagerAddress() throws DiscoveryException

for (int retries = 0; retries < DISCOVERFAILURERETRIES; retries++) {

final DatagramPacket lookupRequest = createJobManagerLookupRequestPacket();
final DatagramPacket lookupRequest = createJobManagerLookupRequestPacket(magicNumber);

for (InetAddress broadcast : targetAddresses) {
lookupRequest.setAddress(broadcast);
lookupRequest.setPort(DISCOVERYPORT);
lookupRequest.setPort(discoveryPort);
LOG.debug("Sending discovery request to " + lookupRequest.getSocketAddress());
socket.send(lookupRequest);
}
Expand All @@ -462,7 +492,7 @@ public static InetSocketAddress getJobManagerAddress() throws DiscoveryException
continue;
}

if (!isPacketForUs(responsePacket)) {
if (!isPacketForUs(responsePacket, magicNumber)) {
LOG.debug("Received packet which is not destined to this Nephele setup");
continue;
}
Expand Down Expand Up @@ -659,7 +689,7 @@ public void run() {
try {
this.serverSocket.receive(requestPacket);

if (!isPacketForUs(requestPacket)) {
if (!isPacketForUs(requestPacket, this.magicNumber)) {
LOG.debug("Received request packet which is not destined to this Nephele setup");
continue;
}
Expand Down Expand Up @@ -689,7 +719,8 @@ public void run() {
if (packetTypeID == JM_LOOKUP_REQUEST_ID) {

LOG.debug("Received job manager lookup request from " + requestPacket.getSocketAddress());
final DatagramPacket responsePacket = createJobManagerLookupReplyPacket(this.ipcPort);
final DatagramPacket responsePacket = createJobManagerLookupReplyPacket(this.ipcPort,
this.magicNumber);
responsePacket.setAddress(requestPacket.getAddress());
responsePacket.setPort(requestPacket.getPort());

Expand All @@ -698,7 +729,7 @@ public void run() {
} else if (packetTypeID == TM_ADDRESS_REQUEST_ID) {
LOG.debug("Received task manager address request from " + requestPacket.getSocketAddress());
final DatagramPacket responsePacket = createTaskManagerAddressReplyPacket(requestPacket
.getAddress());
.getAddress(), this.magicNumber);
responsePacket.setAddress(requestPacket.getAddress());
responsePacket.setPort(requestPacket.getPort());

Expand Down Expand Up @@ -765,10 +796,12 @@ private static int byteArrayToInteger(final byte[] byteArray, final int offset)
*
* @param packet
* the packet to check
* @param magicNumber
* the magic number identifying the discovery service
* @return <code>true</code> if the packet carries the magic number expected by the local service, otherwise
* <code>false</code>
*/
private static boolean isPacketForUs(final DatagramPacket packet) {
private static boolean isPacketForUs(final DatagramPacket packet, final int magicNumber) {

final byte[] data = packet.getData();

Expand All @@ -780,8 +813,7 @@ private static boolean isPacketForUs(final DatagramPacket packet) {
return false;
}

if (byteArrayToInteger(data, MAGIC_NUMBER_OFFSET) != GlobalConfiguration.getInteger(MAGICNUMBER_KEY,
DEFAULT_MAGICNUMBER_VALUE)) {
if (byteArrayToInteger(data, MAGIC_NUMBER_OFFSET) != magicNumber) {
return false;
}

Expand Down

0 comments on commit 397bf1d

Please sign in to comment.