Skip to content

Commit

Permalink
Minor style changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Jul 18, 2012
1 parent fc3a158 commit 34b7493
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,62 @@
* @author casp
*/

public class MulticastManager implements ChannelLookupProtocol {
public final class MulticastManager implements ChannelLookupProtocol {

/**
* The log object used to report errors and warnings.
*/
private static final Log LOG = LogFactory.getLog(JobManager.class);

// Indicates if the arrangement of nodes within the overlay-tree should be randomized or not.
// If set to false, arrangement of the same set of receiver nodes is guaranteed to be the same
/**
* Indicates if the arrangement of nodes within the overlay-tree should be randomized or not. If set to false,
* arrangement of the same set of receiver nodes is guaranteed to be the same
*/
private final boolean randomized;

// Indicates if the tree should be constructed with a given topology stored in a file
private final boolean usehardcodedtree;
/**
* Indicates if the tree should be constructed with a given topology stored in a file.
*/
private final boolean useHardCodedTree;

// File containing the hard-coded tree topology, if desired
// Should contain node names (eg hostnames) with corresponding children per line
// eg: a line "vm1.local vm2.local vm3.local"
// would result in vm1.local connecting to vm2.local and vm3.local as children
// no further checking for connectivity of the given topology is done!
private final String hardcodedtreefilepath;
/**
* File containing the hard-coded tree topology, if desired should contain node names (e.g. hostnames) with
* corresponding children per line.
* For example, a line "vm1.local vm2.local vm3.local" would result in vm1.local connecting to vm2.local and
* vm3.local as children no further checking for connectivity of the given topology is done!
*/
private final String hardCodedTreeFilePath;

// Indicates the desired branching of the generated multicast-tree. 0 means unicast transmisison, 1 sequential tree
// 2 binomial tree, 3+ clustered tree
private final int treebranching;
/**
* Indicates the desired branching of the generated multicast-tree. 0 means unicast transmisison, 1 sequential tree,
* 2 binomial tree, 3+ clustered tree
*/
private final int treeBranching;

/**
* Reference to the scheduler.
*/
private final AbstractScheduler scheduler;

/**
* Map caching already computed multicast forwarding tables.
*/
private final Map<ChannelID, MulticastForwardingTable> cachedTrees = new HashMap<ChannelID, MulticastForwardingTable>();

/**
* Constructs a new multicast manager.
*
* @param scheduler
* reference to the scheduler
*/
public MulticastManager(final AbstractScheduler scheduler) {

this.scheduler = scheduler;

this.randomized = GlobalConfiguration.getBoolean("multicast.randomize", false);
this.treebranching = GlobalConfiguration.getInteger("multicast.branching", 1);
this.usehardcodedtree = GlobalConfiguration.getBoolean("multicast.usehardcodedtree", false);
this.hardcodedtreefilepath = GlobalConfiguration.getString("multicast.hardcodedtreefile", null);
this.treeBranching = GlobalConfiguration.getInteger("multicast.branching", 1);
this.useHardCodedTree = GlobalConfiguration.getBoolean("multicast.usehardcodedtree", false);
this.hardCodedTreeFilePath = GlobalConfiguration.getString("multicast.hardcodedtreefile", null);
}

/**
Expand All @@ -98,37 +121,41 @@ public MulticastManager(final AbstractScheduler scheduler) {
* the ID of the channel to resolve
* @return the lookup response containing the connection info and a return code
*/
public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID,
ChannelID sourceChannelID) {
public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(final InstanceConnectionInfo caller,
final JobID jobID, final ChannelID sourceChannelID) {

LOG.info("Receiving multicast receiver request from " + caller + " channel ID: " + sourceChannelID);
if (LOG.isInfoEnabled()) {
LOG.info("Receiving multicast receiver request from " + caller + " channel ID: " + sourceChannelID);
}

// check, if the tree is already created and cached
// Check if the tree is already created and cached
if (this.cachedTrees.containsKey(sourceChannelID)) {

LOG.info("Replying with cached entry...");
return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);

} else {

// no tree exists - we assume that this is the sending node initiating a multicast
// No tree exists, so we assume that this is the sending node initiating a multicast

if (!checkIfAllTargetVerticesReady(caller, jobID, sourceChannelID)) {
LOG.info("Received multicast request but not all receivers ready.");

return ConnectionInfoLookupResponse.createReceiverNotReady();
}

// receivers up and running.. extract tree nodes...
LinkedList<TreeNode> treenodes = extractTreeNodes(caller, jobID, sourceChannelID, this.randomized);
// Receivers are up and running.. extract tree nodes...
LinkedList<TreeNode> treeNodes = extractTreeNodes(caller, jobID, sourceChannelID, this.randomized);

// Do we want to use a hard-coded tree topology?
if (this.usehardcodedtree) {
LOG.info("Creating a hard-coded tree topology from file: " + hardcodedtreefilepath);
cachedTrees.put(sourceChannelID, createHardCodedTree(treenodes));
if (this.useHardCodedTree) {
LOG.info("Creating a hard-coded tree topology from file: " + hardCodedTreeFilePath);
cachedTrees.put(sourceChannelID, createHardCodedTree(treeNodes));
return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
}

// Otherwise we create a default tree and put it into the tree-cache
cachedTrees.put(sourceChannelID, createDefaultTree(treenodes, this.treebranching));
cachedTrees.put(sourceChannelID, createDefaultTree(treeNodes, this.treeBranching));
return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);

}
Expand All @@ -142,7 +169,7 @@ public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(InstanceCo
* @param nodes
* @return
*/
private TreeNode pollClosestNode(TreeNode indicator, LinkedList<TreeNode> nodes) {
private TreeNode pollClosestNode(final TreeNode indicator, final LinkedList<TreeNode> nodes) {

TreeNode closestnode = getClosestNode(indicator, nodes);

Expand All @@ -161,20 +188,20 @@ private TreeNode pollClosestNode(TreeNode indicator, LinkedList<TreeNode> nodes)
* @param nodes
* @return
*/
private TreeNode getClosestNode(TreeNode indicator, LinkedList<TreeNode> nodes) {
private TreeNode getClosestNode(final TreeNode indicator, final LinkedList<TreeNode> nodes) {

if (indicator == null) {
return nodes.getFirst();
}

TreeNode closestnode = null;
TreeNode closestNode = null;
for (TreeNode n : nodes) {
if (closestnode == null || n.getDistance(indicator) < closestnode.getDistance(indicator)) {
closestnode = n;
if (closestNode == null || n.getDistance(indicator) < closestNode.getDistance(indicator)) {
closestNode = n;
}
}

return closestnode;
return closestNode;
}

/**
Expand Down Expand Up @@ -231,7 +258,7 @@ private MulticastForwardingTable createDefaultTree(LinkedList<TreeNode> nodes, i
*/
private MulticastForwardingTable createHardCodedTree(LinkedList<TreeNode> nodes) {
try {
FileInputStream fstream = new FileInputStream(this.hardcodedtreefilepath);
FileInputStream fstream = new FileInputStream(this.hardCodedTreeFilePath);
DataInputStream in = new DataInputStream(fstream);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String strLine;
Expand Down Expand Up @@ -271,6 +298,7 @@ private MulticastForwardingTable createHardCodedTree(LinkedList<TreeNode> nodes)
* @return
*/
private boolean checkIfAllTargetVerticesReady(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {

final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);

final ExecutionEdge outputChannel = eg.getEdgeByID(sourceChannelID);
Expand Down Expand Up @@ -320,13 +348,13 @@ private boolean checkIfAllTargetVerticesReady(InstanceConnectionInfo caller, Job
* @param sourceChannelID
* @return
*/
private LinkedList<TreeNode> extractTreeNodes(InstanceConnectionInfo source, JobID jobID,
ChannelID sourceChannelID, boolean randomize) {
private LinkedList<TreeNode> extractTreeNodes(final InstanceConnectionInfo source, final JobID jobID,
final ChannelID sourceChannelID, final boolean randomize) {

final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);

final ExecutionEdge outputChannel = eg.getEdgeByID(sourceChannelID);

final ExecutionGate broadcastGate = outputChannel.getOutputGate();

final LinkedList<ExecutionEdge> outputChannels = new LinkedList<ExecutionEdge>();
Expand Down Expand Up @@ -354,7 +382,7 @@ private LinkedList<TreeNode> extractTreeNodes(InstanceConnectionInfo source, Job
// the connection ID should not be needed for the root node (as it is not set as remote receiver)
// but in order to maintain consistency, it also gets the connectionID of the first channel pointing to it
firstConnectionID = actualOutputChannel.getConnectionID();

final ExecutionVertex targetVertex = actualOutputChannel.getInputGate().getVertex();

// is the target vertex running on the same instance?
Expand All @@ -374,12 +402,12 @@ private LinkedList<TreeNode> extractTreeNodes(InstanceConnectionInfo source, Job

// now we have the root-node.. lets extract all other nodes

LinkedList<TreeNode> receivernodes = new LinkedList<TreeNode>();
LinkedList<TreeNode> receiverNodes = new LinkedList<TreeNode>();

while (outputChannels.size() > 0) {

final ExecutionEdge firstChannel = outputChannels.pollFirst();

// each receiver nodes' endpoint is associated with the connection ID
// of the first channel pointing to this node.
final int connectionID = firstChannel.getConnectionID();
Expand All @@ -405,7 +433,7 @@ private LinkedList<TreeNode> extractTreeNodes(InstanceConnectionInfo source, Job
if (actualTarget.getAllocatedResource().getInstance().getInstanceConnectionInfo()
.equals(actualInstance)) {
actualLocalTargets.add(actualOutputChannel.getInputChannelID());

iter.remove();

}
Expand All @@ -415,22 +443,21 @@ private LinkedList<TreeNode> extractTreeNodes(InstanceConnectionInfo source, Job
// create tree node for current instance
actualNode = new TreeNode(firstTarget.getAllocatedResource().getInstance(), actualInstance, connectionID,
actualLocalTargets);


receivernodes.add(actualNode);
receiverNodes.add(actualNode);

}// end while

// Do we want to shuffle the receiver nodes?
// Only randomize the receivers, as the sender (the first one) has to stay the same
if (randomize) {
Collections.shuffle(receivernodes);
Collections.shuffle(receiverNodes);
} else {
// Sort Tree Nodes according to host name..
Collections.sort(receivernodes);
Collections.sort(receiverNodes);
}

treeNodes.addAll(receivernodes);
treeNodes.addAll(receiverNodes);

return treeNodes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver;

/**
* Each physical Node (instance) within a multicast-tree is represented by a TreeNode object.
* Each physical node (instance) within a multicast tree is represented by a TreeNode object.
* It contains the connection info for the certain node and a list of the local output channels.
*
* @author casp
Expand All @@ -40,7 +40,7 @@ public class TreeNode implements Comparable<TreeNode> {
private final InstanceConnectionInfo nodeConnectionInfo;

private final int connectionID;

private final LinkedList<ChannelID> localTargets;

private final LinkedList<TreeNode> children = new LinkedList<TreeNode>();
Expand Down Expand Up @@ -103,8 +103,8 @@ public TreeNode getParent() {
private InstanceConnectionInfo getConnectionInfo() {
return this.nodeConnectionInfo;
}
private int getConnectionID(){

private int getConnectionID() {
return this.connectionID;
}

Expand Down Expand Up @@ -161,33 +161,33 @@ public MulticastForwardingTable createForwardingTable() {
* @param table
*/
private void generateRecursiveForwardingTable(MulticastForwardingTable table) {
ConnectionInfoLookupResponse actualentry = ConnectionInfoLookupResponse.createReceiverFoundAndReady();

final ConnectionInfoLookupResponse lookupResponse = ConnectionInfoLookupResponse.createReceiverFoundAndReady();

// add local targets
for (ChannelID i : this.localTargets) {
actualentry.addLocalTarget(i);
for (final ChannelID i : this.localTargets) {
lookupResponse.addLocalTarget(i);
}

// add remote targets
for (TreeNode n : this.children) {
for (final TreeNode n : this.children) {

// Instance Connection info associated with the remote target
final InstanceConnectionInfo ici = n.getConnectionInfo();

// get the connection ID associated with the remote target endpoint
final int icid = n.getConnectionID();

final InetSocketAddress isa = new InetSocketAddress(ici.getAddress(), ici.getDataPort());

actualentry.addRemoteTarget(new RemoteReceiver(isa, icid));
lookupResponse.addRemoteTarget(new RemoteReceiver(isa, icid));
}

table.addConnectionInfo(this.nodeConnectionInfo, actualentry);
table.addConnectionInfo(this.nodeConnectionInfo, lookupResponse);

for (TreeNode n : this.children) {
for (final TreeNode n : this.children) {
n.generateRecursiveForwardingTable(table);
}

}

/**
Expand All @@ -196,12 +196,14 @@ private void generateRecursiveForwardingTable(MulticastForwardingTable table) {
* @return
*/
public String printTree() {

StringBuilder sb = new StringBuilder();
this.printRecursiveTree(sb);
return sb.toString();
}

private void printRecursiveTree(StringBuilder sb) {

if (this.children.size() > 0) {
sb.append("STRUCT ");

Expand All @@ -220,12 +222,13 @@ private void printRecursiveTree(StringBuilder sb) {
}
}

private class IntegerProperty {
private static class IntegerProperty {

private String key = null;

private int value = 0;

public IntegerProperty(String key, int value) {
public IntegerProperty(final String key, final int value) {
this.key = key;
this.value = value;
}
Expand All @@ -238,7 +241,7 @@ public int getValue() {
return this.value;
}

public void setValue(int value) {
public void setValue(final int value) {
this.value = value;
}
}
Expand Down

0 comments on commit 34b7493

Please sign in to comment.