From 34b74936291c345c0b7262cf7e38552565ff9267 Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Wed, 18 Jul 2012 19:11:02 +0200 Subject: [PATCH] Minor style changes --- .../nephele/multicast/MulticastManager.java | 121 +++++++++++------- .../nephele/multicast/TreeNode.java | 37 +++--- 2 files changed, 94 insertions(+), 64 deletions(-) diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java index 8530b21a221ca..6464148f1f441 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java @@ -51,39 +51,62 @@ * @author casp */ -public class MulticastManager implements ChannelLookupProtocol { +public final class MulticastManager implements ChannelLookupProtocol { + /** + * The log object used to report errors and warnings. + */ private static final Log LOG = LogFactory.getLog(JobManager.class); - // Indicates if the arrangement of nodes within the overlay-tree should be randomized or not. - // If set to false, arrangement of the same set of receiver nodes is guaranteed to be the same + /** + * Indicates if the arrangement of nodes within the overlay-tree should be randomized or not. If set to false, + * arrangement of the same set of receiver nodes is guaranteed to be the same + */ private final boolean randomized; - // Indicates if the tree should be constructed with a given topology stored in a file - private final boolean usehardcodedtree; + /** + * Indicates if the tree should be constructed with a given topology stored in a file. + */ + private final boolean useHardCodedTree; - // File containing the hard-coded tree topology, if desired - // Should contain node names (eg hostnames) with corresponding children per line - // eg: a line "vm1.local vm2.local vm3.local" - // would result in vm1.local connecting to vm2.local and vm3.local as children - // no further checking for connectivity of the given topology is done! - private final String hardcodedtreefilepath; + /** + * File containing the hard-coded tree topology, if desired should contain node names (e.g. hostnames) with + * corresponding children per line. + * For example, a line "vm1.local vm2.local vm3.local" would result in vm1.local connecting to vm2.local and + * vm3.local as children no further checking for connectivity of the given topology is done! + */ + private final String hardCodedTreeFilePath; - // Indicates the desired branching of the generated multicast-tree. 0 means unicast transmisison, 1 sequential tree - // 2 binomial tree, 3+ clustered tree - private final int treebranching; + /** + * Indicates the desired branching of the generated multicast-tree. 0 means unicast transmisison, 1 sequential tree, + * 2 binomial tree, 3+ clustered tree + */ + private final int treeBranching; + /** + * Reference to the scheduler. + */ private final AbstractScheduler scheduler; + /** + * Map caching already computed multicast forwarding tables. + */ private final Map cachedTrees = new HashMap(); + /** + * Constructs a new multicast manager. + * + * @param scheduler + * reference to the scheduler + */ public MulticastManager(final AbstractScheduler scheduler) { + this.scheduler = scheduler; this.randomized = GlobalConfiguration.getBoolean("multicast.randomize", false); - this.treebranching = GlobalConfiguration.getInteger("multicast.branching", 1); - this.usehardcodedtree = GlobalConfiguration.getBoolean("multicast.usehardcodedtree", false); - this.hardcodedtreefilepath = GlobalConfiguration.getString("multicast.hardcodedtreefile", null); + this.treeBranching = GlobalConfiguration.getInteger("multicast.branching", 1); + this.useHardCodedTree = GlobalConfiguration.getBoolean("multicast.usehardcodedtree", false); + this.hardCodedTreeFilePath = GlobalConfiguration.getString("multicast.hardcodedtreefile", null); } /** @@ -98,18 +121,22 @@ public MulticastManager(final AbstractScheduler scheduler) { * the ID of the channel to resolve * @return the lookup response containing the connection info and a return code */ - public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, - ChannelID sourceChannelID) { + public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(final InstanceConnectionInfo caller, + final JobID jobID, final ChannelID sourceChannelID) { - LOG.info("Receiving multicast receiver request from " + caller + " channel ID: " + sourceChannelID); + if (LOG.isInfoEnabled()) { + LOG.info("Receiving multicast receiver request from " + caller + " channel ID: " + sourceChannelID); + } - // check, if the tree is already created and cached + // Check if the tree is already created and cached if (this.cachedTrees.containsKey(sourceChannelID)) { + LOG.info("Replying with cached entry..."); return cachedTrees.get(sourceChannelID).getConnectionInfo(caller); + } else { - // no tree exists - we assume that this is the sending node initiating a multicast + // No tree exists, so we assume that this is the sending node initiating a multicast if (!checkIfAllTargetVerticesReady(caller, jobID, sourceChannelID)) { LOG.info("Received multicast request but not all receivers ready."); @@ -117,18 +144,18 @@ public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(InstanceCo return ConnectionInfoLookupResponse.createReceiverNotReady(); } - // receivers up and running.. extract tree nodes... - LinkedList treenodes = extractTreeNodes(caller, jobID, sourceChannelID, this.randomized); + // Receivers are up and running.. extract tree nodes... + LinkedList treeNodes = extractTreeNodes(caller, jobID, sourceChannelID, this.randomized); // Do we want to use a hard-coded tree topology? - if (this.usehardcodedtree) { - LOG.info("Creating a hard-coded tree topology from file: " + hardcodedtreefilepath); - cachedTrees.put(sourceChannelID, createHardCodedTree(treenodes)); + if (this.useHardCodedTree) { + LOG.info("Creating a hard-coded tree topology from file: " + hardCodedTreeFilePath); + cachedTrees.put(sourceChannelID, createHardCodedTree(treeNodes)); return cachedTrees.get(sourceChannelID).getConnectionInfo(caller); } // Otherwise we create a default tree and put it into the tree-cache - cachedTrees.put(sourceChannelID, createDefaultTree(treenodes, this.treebranching)); + cachedTrees.put(sourceChannelID, createDefaultTree(treeNodes, this.treeBranching)); return cachedTrees.get(sourceChannelID).getConnectionInfo(caller); } @@ -142,7 +169,7 @@ public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(InstanceCo * @param nodes * @return */ - private TreeNode pollClosestNode(TreeNode indicator, LinkedList nodes) { + private TreeNode pollClosestNode(final TreeNode indicator, final LinkedList nodes) { TreeNode closestnode = getClosestNode(indicator, nodes); @@ -161,20 +188,20 @@ private TreeNode pollClosestNode(TreeNode indicator, LinkedList nodes) * @param nodes * @return */ - private TreeNode getClosestNode(TreeNode indicator, LinkedList nodes) { + private TreeNode getClosestNode(final TreeNode indicator, final LinkedList nodes) { if (indicator == null) { return nodes.getFirst(); } - TreeNode closestnode = null; + TreeNode closestNode = null; for (TreeNode n : nodes) { - if (closestnode == null || n.getDistance(indicator) < closestnode.getDistance(indicator)) { - closestnode = n; + if (closestNode == null || n.getDistance(indicator) < closestNode.getDistance(indicator)) { + closestNode = n; } } - return closestnode; + return closestNode; } /** @@ -231,7 +258,7 @@ private MulticastForwardingTable createDefaultTree(LinkedList nodes, i */ private MulticastForwardingTable createHardCodedTree(LinkedList nodes) { try { - FileInputStream fstream = new FileInputStream(this.hardcodedtreefilepath); + FileInputStream fstream = new FileInputStream(this.hardCodedTreeFilePath); DataInputStream in = new DataInputStream(fstream); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String strLine; @@ -271,6 +298,7 @@ private MulticastForwardingTable createHardCodedTree(LinkedList nodes) * @return */ private boolean checkIfAllTargetVerticesReady(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) { + final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID); final ExecutionEdge outputChannel = eg.getEdgeByID(sourceChannelID); @@ -320,13 +348,13 @@ private boolean checkIfAllTargetVerticesReady(InstanceConnectionInfo caller, Job * @param sourceChannelID * @return */ - private LinkedList extractTreeNodes(InstanceConnectionInfo source, JobID jobID, - ChannelID sourceChannelID, boolean randomize) { + private LinkedList extractTreeNodes(final InstanceConnectionInfo source, final JobID jobID, + final ChannelID sourceChannelID, final boolean randomize) { final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID); final ExecutionEdge outputChannel = eg.getEdgeByID(sourceChannelID); - + final ExecutionGate broadcastGate = outputChannel.getOutputGate(); final LinkedList outputChannels = new LinkedList(); @@ -354,7 +382,7 @@ private LinkedList extractTreeNodes(InstanceConnectionInfo source, Job // the connection ID should not be needed for the root node (as it is not set as remote receiver) // but in order to maintain consistency, it also gets the connectionID of the first channel pointing to it firstConnectionID = actualOutputChannel.getConnectionID(); - + final ExecutionVertex targetVertex = actualOutputChannel.getInputGate().getVertex(); // is the target vertex running on the same instance? @@ -374,12 +402,12 @@ private LinkedList extractTreeNodes(InstanceConnectionInfo source, Job // now we have the root-node.. lets extract all other nodes - LinkedList receivernodes = new LinkedList(); + LinkedList receiverNodes = new LinkedList(); while (outputChannels.size() > 0) { final ExecutionEdge firstChannel = outputChannels.pollFirst(); - + // each receiver nodes' endpoint is associated with the connection ID // of the first channel pointing to this node. final int connectionID = firstChannel.getConnectionID(); @@ -405,7 +433,7 @@ private LinkedList extractTreeNodes(InstanceConnectionInfo source, Job if (actualTarget.getAllocatedResource().getInstance().getInstanceConnectionInfo() .equals(actualInstance)) { actualLocalTargets.add(actualOutputChannel.getInputChannelID()); - + iter.remove(); } @@ -415,22 +443,21 @@ private LinkedList extractTreeNodes(InstanceConnectionInfo source, Job // create tree node for current instance actualNode = new TreeNode(firstTarget.getAllocatedResource().getInstance(), actualInstance, connectionID, actualLocalTargets); - - receivernodes.add(actualNode); + receiverNodes.add(actualNode); }// end while // Do we want to shuffle the receiver nodes? // Only randomize the receivers, as the sender (the first one) has to stay the same if (randomize) { - Collections.shuffle(receivernodes); + Collections.shuffle(receiverNodes); } else { // Sort Tree Nodes according to host name.. - Collections.sort(receivernodes); + Collections.sort(receiverNodes); } - treeNodes.addAll(receivernodes); + treeNodes.addAll(receiverNodes); return treeNodes; diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java index f2fb06081076f..496e25ebbbc7d 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java @@ -25,7 +25,7 @@ import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver; /** - * Each physical Node (instance) within a multicast-tree is represented by a TreeNode object. + * Each physical node (instance) within a multicast tree is represented by a TreeNode object. * It contains the connection info for the certain node and a list of the local output channels. * * @author casp @@ -40,7 +40,7 @@ public class TreeNode implements Comparable { private final InstanceConnectionInfo nodeConnectionInfo; private final int connectionID; - + private final LinkedList localTargets; private final LinkedList children = new LinkedList(); @@ -103,8 +103,8 @@ public TreeNode getParent() { private InstanceConnectionInfo getConnectionInfo() { return this.nodeConnectionInfo; } - - private int getConnectionID(){ + + private int getConnectionID() { return this.connectionID; } @@ -161,33 +161,33 @@ public MulticastForwardingTable createForwardingTable() { * @param table */ private void generateRecursiveForwardingTable(MulticastForwardingTable table) { - ConnectionInfoLookupResponse actualentry = ConnectionInfoLookupResponse.createReceiverFoundAndReady(); + + final ConnectionInfoLookupResponse lookupResponse = ConnectionInfoLookupResponse.createReceiverFoundAndReady(); // add local targets - for (ChannelID i : this.localTargets) { - actualentry.addLocalTarget(i); + for (final ChannelID i : this.localTargets) { + lookupResponse.addLocalTarget(i); } // add remote targets - for (TreeNode n : this.children) { + for (final TreeNode n : this.children) { // Instance Connection info associated with the remote target final InstanceConnectionInfo ici = n.getConnectionInfo(); - + // get the connection ID associated with the remote target endpoint final int icid = n.getConnectionID(); - + final InetSocketAddress isa = new InetSocketAddress(ici.getAddress(), ici.getDataPort()); - actualentry.addRemoteTarget(new RemoteReceiver(isa, icid)); + lookupResponse.addRemoteTarget(new RemoteReceiver(isa, icid)); } - table.addConnectionInfo(this.nodeConnectionInfo, actualentry); + table.addConnectionInfo(this.nodeConnectionInfo, lookupResponse); - for (TreeNode n : this.children) { + for (final TreeNode n : this.children) { n.generateRecursiveForwardingTable(table); } - } /** @@ -196,12 +196,14 @@ private void generateRecursiveForwardingTable(MulticastForwardingTable table) { * @return */ public String printTree() { + StringBuilder sb = new StringBuilder(); this.printRecursiveTree(sb); return sb.toString(); } private void printRecursiveTree(StringBuilder sb) { + if (this.children.size() > 0) { sb.append("STRUCT "); @@ -220,12 +222,13 @@ private void printRecursiveTree(StringBuilder sb) { } } - private class IntegerProperty { + private static class IntegerProperty { + private String key = null; private int value = 0; - public IntegerProperty(String key, int value) { + public IntegerProperty(final String key, final int value) { this.key = key; this.value = value; } @@ -238,7 +241,7 @@ public int getValue() { return this.value; } - public void setValue(int value) { + public void setValue(final int value) { this.value = value; } }