Skip to content

Commit

Permalink
Improved efficiency of mechanism to release instances
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Aug 7, 2012
1 parent 807fbc1 commit 70fdcd0
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -789,33 +789,6 @@ public ExecutionEdge getEdgeByID(final ChannelID id) {
return this.edgeMap.get(id);
}

/**
* Returns a (possibly empty) list of execution vertices which are currently assigned to the
* given allocated resource. The vertices in that list may have an arbitrary execution state.
*
* @param allocatedResource
* the allocated resource to check the assignment for
* @return a (possibly empty) list of execution vertices which are currently assigned to the given instance
*/
public List<ExecutionVertex> getVerticesAssignedToResource(final AllocatedResource allocatedResource) {

final List<ExecutionVertex> list = new ArrayList<ExecutionVertex>();

if (allocatedResource == null) {
return list;
}

final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(this, true);
while (it.hasNext()) {
final ExecutionVertex vertex = it.next();
if (allocatedResource.equals(vertex.getAllocatedResource())) {
list.add(vertex);
}
}

return list;
}

/**
* Registers an execution vertex with the execution graph.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public final class ExecutionVertex {
/**
* The allocated resources assigned to this vertex.
*/
private volatile AllocatedResource allocatedResource = null;
private final AtomicReference<AllocatedResource> allocatedResource = new AtomicReference<AllocatedResource>(null);

/**
* The allocation ID identifying the allocated resources used by this vertex
Expand Down Expand Up @@ -255,7 +255,7 @@ public ExecutionVertex duplicateVertex(final boolean preserveVertexID) {
duplicatedVertex.checkpointState.set(this.checkpointState.get());

// TODO set new profiling record with new vertex id
duplicatedVertex.setAllocatedResource(this.allocatedResource);
duplicatedVertex.setAllocatedResource(this.allocatedResource.get());

return duplicatedVertex;
}
Expand Down Expand Up @@ -475,7 +475,12 @@ public void setAllocatedResource(final AllocatedResource allocatedResource) {
throw new IllegalArgumentException("Argument allocatedResource must not be null");
}

this.allocatedResource = allocatedResource;
final AllocatedResource previousResource = this.allocatedResource.getAndSet(allocatedResource);
if (previousResource != null) {
previousResource.removeVertexFromResource(this);
}

allocatedResource.assignVertexToResource(this);

// Notify all listener objects
final Iterator<VertexAssignmentListener> it = this.vertexAssignmentListeners.iterator();
Expand All @@ -490,7 +495,8 @@ public void setAllocatedResource(final AllocatedResource allocatedResource) {
* @return the allocated resources assigned to this execution vertex
*/
public AllocatedResource getAllocatedResource() {
return this.allocatedResource;

return this.allocatedResource.get();
}

/**
Expand Down Expand Up @@ -688,7 +694,9 @@ public ExecutionGate getInputGate(final int index) {
*/
public TaskSubmissionResult startTask() {

if (this.allocatedResource == null) {
final AllocatedResource ar = this.allocatedResource.get();

if (ar == null) {
final TaskSubmissionResult result = new TaskSubmissionResult(getID(),
AbstractTaskResult.ReturnCode.NO_INSTANCE);
result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
Expand All @@ -699,7 +707,7 @@ public TaskSubmissionResult startTask() {
tasks.add(constructDeploymentDescriptor());

try {
final List<TaskSubmissionResult> results = this.allocatedResource.getInstance().submitTasks(tasks);
final List<TaskSubmissionResult> results = ar.getInstance().submitTasks(tasks);

return results.get(0);

Expand Down Expand Up @@ -729,14 +737,16 @@ public TaskKillResult killTask() {
return result;
}

if (this.allocatedResource == null) {
final AllocatedResource ar = this.allocatedResource.get();

if (ar == null) {
final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.NO_INSTANCE);
result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
return result;
}

try {
return this.allocatedResource.getInstance().killTask(this.vertexID);
return ar.getInstance().killTask(this.vertexID);
} catch (IOException e) {
final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.IPC_ERROR);
result.setDescription(StringUtils.stringifyException(e));
Expand All @@ -746,15 +756,17 @@ public TaskKillResult killTask() {

public TaskCheckpointResult requestCheckpointDecision() {

if (this.allocatedResource == null) {
final AllocatedResource ar = this.allocatedResource.get();

if (ar == null) {
final TaskCheckpointResult result = new TaskCheckpointResult(getID(),
AbstractTaskResult.ReturnCode.NO_INSTANCE);
result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
return result;
}

try {
return this.allocatedResource.getInstance().requestCheckpointDecision(this.vertexID);
return ar.getInstance().requestCheckpointDecision(this.vertexID);

} catch (IOException e) {
final TaskCheckpointResult result = new TaskCheckpointResult(getID(),
Expand Down Expand Up @@ -826,15 +838,17 @@ public TaskCancelResult cancelTask() {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}

if (this.allocatedResource == null) {
final AllocatedResource ar = this.allocatedResource.get();

if (ar == null) {
final TaskCancelResult result = new TaskCancelResult(getID(),
AbstractTaskResult.ReturnCode.NO_INSTANCE);
result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
return result;
}

try {
return this.allocatedResource.getInstance().cancelTask(this.vertexID);
return ar.getInstance().cancelTask(this.vertexID);

} catch (IOException e) {
final TaskCancelResult result = new TaskCancelResult(getID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package eu.stratosphere.nephele.instance;

import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -168,6 +169,16 @@ public void assignVertexToResource(final ExecutionVertex vertex) {
}
}

/**
* Returns an iterator over all execution vertices currently assigned to this allocated resource.
*
* @return an iterator over all execution vertices currently assigned to this allocated resource
*/
public Iterator<ExecutionVertex> assignedVertices() {

return this.assignedVertices.iterator();
}

/**
* Removes the given execution vertex from this allocated resource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,8 @@ public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph
return;
}

final List<ExecutionVertex> assignedVertices = executionGraph
.getVerticesAssignedToResource(allocatedResource);
if (assignedVertices.isEmpty()) {
return;
}

boolean resourceCanBeReleased = true;
Iterator<ExecutionVertex> it = assignedVertices.iterator();
final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
while (it.hasNext()) {
final ExecutionVertex vertex = it.next();
final ExecutionState state = vertex.getExecutionState();
Expand Down Expand Up @@ -587,8 +581,7 @@ public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResour
return;
}

final List<ExecutionVertex> vertices = executionGraph.getVerticesAssignedToResource(allocatedResource);
Iterator<ExecutionVertex> vertexIter = vertices.iterator();
Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();

// Assign vertices back to a dummy resource.
final DummyInstance dummyInstance = DummyInstance.createDummyInstance(allocatedResource.getInstance()
Expand All @@ -603,7 +596,7 @@ public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResour

final String failureMessage = allocatedResource.getInstance().getName() + " died";

vertexIter = vertices.iterator();
vertexIter = allocatedResource.assignedVertices();

while (vertexIter.hasNext()) {
final ExecutionVertex vertex = vertexIter.next();
Expand Down

0 comments on commit 70fdcd0

Please sign in to comment.