Skip to content

Commit

Permalink
Simplified thread model for updates to the execution graph
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Aug 12, 2012
1 parent f79b81e commit 15f6d3a
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -99,6 +101,11 @@ public class ExecutionGraph implements ExecutionListener {
*/
private final CopyOnWriteArrayList<ExecutionStage> stages = new CopyOnWriteArrayList<ExecutionStage>();

/**
* The executor service to asynchronously perform update operations to this graph.
*/
private final ExecutorService executorService = Executors.newSingleThreadExecutor();

/**
* Index to the current execution stage.
*/
Expand Down Expand Up @@ -1419,4 +1426,15 @@ public int getPriority() {

return 1;
}

/**
* Performs an asynchronous update operation to this execution graph.
*
* @param command
* the update command to be asynchronously executed on this graph
*/
public void executeCommand(final Runnable command) {

this.executorService.execute(command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,43 @@ public ExecutionState getExecutionState() {
return this.executionState.get();
}

/**
* Updates the vertex's current execution state through the job's executor service.
*
* @param newExecutionState
* the new execution state
* @param optionalMessage
* an optional message related to the state change
*/
public void updateExecutionStateAsynchronously(final ExecutionState newExecutionState,
final String optionalMessage) {

final Runnable command = new Runnable() {

/**
* {@inheritDoc}
*/
@Override
public void run() {

updateExecutionState(newExecutionState, optionalMessage);
}
};

this.executionGraph.executeCommand(command);
}

/**
* Updates the vertex's current execution state through the job's executor service.
*
* @param newExecutionState
* the new execution state
*/
public void updateExecutionStateAsynchronously(final ExecutionState newExecutionState) {

updateExecutionStateAsynchronously(newExecutionState, null);
}

/**
* Updates the vertex's current execution state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Expand Down Expand Up @@ -165,7 +166,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol

private final static int FAILURERETURNCODE = -1;

private boolean isShutDown = false;
private final AtomicBoolean isShutDown = new AtomicBoolean(false);

/**
* Constructs a new job manager, starts its discovery service and its IPC service.
Expand Down Expand Up @@ -302,9 +303,9 @@ public void runTaskLoop() {
}
}

public synchronized void shutdown() {
public void shutdown() {

if (this.isShutDown) {
if (this.isShutDown.compareAndSet(false, true)) {
return;
}

Expand Down Expand Up @@ -354,7 +355,7 @@ public synchronized void shutdown() {
this.scheduler.shutdown();
}

this.isShutDown = true;
this.isShutDown.set(true);
LOG.debug("Shutdown of job manager completed");
}

Expand Down Expand Up @@ -672,18 +673,8 @@ public void updateTaskExecutionState(final TaskExecutionState executionState) th
return;
}

final Runnable taskStateChangeRunnable = new Runnable() {

@Override
public void run() {

// The registered listeners of the vertex will make sure the appropriate actions are taken
vertex.updateExecutionState(executionState.getExecutionState(), executionState.getDescription());
}
};

// Hand over to the executor service, as this may result in a longer operation with several IPC operations
this.executorService.execute(taskStateChangeRunnable);
// Asynchronously update execute state of vertex
vertex.updateExecutionStateAsynchronously(executionState.getExecutionState(), executionState.getDescription());
}

/**
Expand All @@ -710,7 +701,8 @@ public void run() {
}
}
};
this.executorService.execute(cancelJobRunnable);

eg.executeCommand(cancelJobRunnable);

LOG.info("Cancel of job " + jobID + " successfully triggered");

Expand Down Expand Up @@ -843,7 +835,19 @@ public ConnectionInfoLookupResponse lookupConnectionInfo(final InstanceConnectio
&& executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {

if (executionState == ExecutionState.ASSIGNED) {
this.scheduler.deployAssignedVertices(targetVertex);

final Runnable command = new Runnable() {

/**
* {@inheritDoc}
*/
@Override
public void run() {
scheduler.deployAssignedVertices(targetVertex);
}
};

eg.executeCommand(command);
}

// LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
Expand Down Expand Up @@ -978,8 +982,7 @@ public void run() {
}
};

// Hand it over to the executor service
this.executorService.execute(runnable);
eg.executeCommand(runnable);
}

/**
Expand Down Expand Up @@ -1013,7 +1016,7 @@ public void run() {
}

/**
* Collects all vertices with checkpoints from the given execution graph and advices the corresponding task managers
* Collects all vertices with checkpoints from the given execution graph and advises the corresponding task managers
* to remove those checkpoints.
*
* @param executionGraph
Expand Down Expand Up @@ -1084,9 +1087,9 @@ public void run() {
*
* @return <code>true</code> if the job manager has been shut down completely, <code>false</code> otherwise
*/
public synchronized boolean isShutDown() {
public boolean isShutDown() {

return this.isShutDown;
return this.isShutDown.get();
}

/**
Expand Down Expand Up @@ -1235,7 +1238,7 @@ public void run() {
} catch (final IOException ioe) {
final String errorMsg = StringUtils.stringifyException(ioe);
for (final ExecutionVertex vertex : verticesToBeDeployed) {
vertex.updateExecutionState(ExecutionState.FAILED, errorMsg);
vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, errorMsg);
}
}

Expand Down Expand Up @@ -1265,7 +1268,7 @@ public void run() {

if (tsr.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
// Change the execution state to failed and let the scheduler deal with the rest
vertex.updateExecutionState(ExecutionState.FAILED, tsr.getDescription());
vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, tsr.getDescription());
}
}
}
Expand Down Expand Up @@ -1303,7 +1306,8 @@ public InputSplitWrapper requestNextInputSplit(final JobID jobID, final Executio
public void updateCheckpointState(final TaskCheckpointState taskCheckpointState) throws IOException {

// Get the graph object for this
final ExecutionGraph executionGraph = this.scheduler.getExecutionGraphByID(taskCheckpointState.getJobID());
final JobID jobID = taskCheckpointState.getJobID();
final ExecutionGraph executionGraph = this.scheduler.getExecutionGraphByID(jobID);
if (executionGraph == null) {
LOG.error("Cannot find execution graph for job " + taskCheckpointState.getJobID()
+ " to update checkpoint state");
Expand All @@ -1327,7 +1331,7 @@ public void run() {
};

// Hand over to the executor service, as this may result in a longer operation with several IPC operations
this.executorService.execute(taskStateChangeRunnable);
executionGraph.executeCommand(taskStateChangeRunnable);
}

/**
Expand Down
Loading

0 comments on commit 15f6d3a

Please sign in to comment.