Skip to content

Commit

Permalink
Replaced slow implementation of ID-to-vertex lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Aug 7, 2012
1 parent 2fef62f commit 1fa6eae
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public class ExecutionGraph implements ExecutionListener {
*/
private final String jobName;

/**
* Mapping of vertex IDs to vertices.
*/
private final ConcurrentMap<ExecutionVertexID, ExecutionVertex> vertexMap = new ConcurrentHashMap<ExecutionVertexID, ExecutionVertex>(
1024);

/**
* Mapping of channel IDs to edges.
*/
Expand Down Expand Up @@ -563,8 +569,7 @@ private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final In
+ ": " + StringUtils.stringifyException(e));
}
} else {
throw new GraphConversionException(
"BUG: JobInputVertex contained a task class which was not an input task.");
throw new GraphConversionException("JobInputVertex contained a task class which was not an input task.");
}

if (inputSplits == null) {
Expand Down Expand Up @@ -811,23 +816,31 @@ public List<ExecutionVertex> getVerticesAssignedToResource(final AllocatedResour
return list;
}

public ExecutionVertex getVertexByID(final ExecutionVertexID id) {
/**
* Registers an execution vertex with the execution graph.
*
* @param vertex
* the execution vertex to register
*/
void registerExecutionVertex(final ExecutionVertex vertex) {

if (id == null) {
return null;
if (this.vertexMap.put(vertex.getID(), vertex) != null) {
throw new IllegalStateException("There is already an execution vertex with ID " + vertex.getID()
+ " registered");
}
}

final ExecutionGraphIterator it = new ExecutionGraphIterator(this, true);

while (it.hasNext()) {

final ExecutionVertex vertex = it.next();
if (vertex.getID().equals(id)) {
return vertex;
}
}
/**
* Returns the execution vertex with the given vertex ID.
*
* @param id
* the vertex ID to retrieve the execution vertex
* @return the execution vertex matching the provided vertex ID or <code>null</code> if no such vertex could be
* found
*/
public ExecutionVertex getVertexByID(final ExecutionVertexID id) {

return null;
return this.vertexMap.get(id);
}

/**
Expand Down Expand Up @@ -1234,6 +1247,7 @@ public void executionStateChanged(final JobID jobID, final ExecutionVertexID ver
final ExecutionVertex vertex = getVertexByID(vertexID);
if (vertex == null) {
LOG.error("Cannot find execution vertex with the ID " + vertexID);
return;
}

final ExecutionState actualExecutionState = vertex.getExecutionState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,13 @@

/**
* An ExecutionGroupVertex is created for every JobVertex of the initial job graph. It represents a number of execution
* vertices
* that originate from the same job vertex.
* vertices that originate from the same job vertex.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
public class ExecutionGroupVertex {
public final class ExecutionGroupVertex {

/**
* The default number of retries in case of an error before the task represented by this vertex is considered as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ private ExecutionVertex(final ExecutionVertexID vertexID, final ExecutionGraph e
this.outputGates = new ExecutionGate[numberOfOutputGates];
this.inputGates = new ExecutionGate[numberOfInputGates];

// Register vertex with execution graph
this.executionGraph.registerExecutionVertex(this);

// Register the vertex itself as a listener for state changes
registerExecutionListener(this.executionGraph);
}
Expand Down

0 comments on commit 1fa6eae

Please sign in to comment.