Skip to content

Commit

Permalink
Continue to work on deploy/cancel race problem
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Jun 27, 2012
1 parent e1ae38f commit acca0ef
Showing 1 changed file with 61 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -743,74 +743,84 @@ public TaskCheckpointResult requestCheckpointDecision() {
*/
public TaskCancelResult cancelTask() {

ExecutionState previousState = this.executionState.get();
while (true) {

// The vertex is currently in the middle of the deployment process
int retry = 1000;
while (previousState == ExecutionState.STARTING) {
final ExecutionState previousState = this.executionState.get();

if (--retry == 0) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.ILLEGAL_STATE);
if (previousState == ExecutionState.CANCELED) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}

try {
Thread.sleep(1);
} catch (InterruptedException ie) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.ILLEGAL_STATE);
if (previousState == ExecutionState.FAILED) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}

previousState = this.executionState.get();
}

if (previousState == ExecutionState.CANCELED) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}
if (previousState == ExecutionState.FINISHED) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}

if (previousState == ExecutionState.FAILED) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}
// The vertex has already received a cancel request
if (previousState == ExecutionState.CANCELING) {
return new TaskCancelResult(getID(), ReturnCode.SUCCESS);
}

if (previousState == ExecutionState.FINISHED) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}
// Do not trigger the cancel request when vertex is in state STARTING, this might cause a race between RPC
// calls.
if (previousState == ExecutionState.STARTING) {

if (updateExecutionState(ExecutionState.CANCELING) != ExecutionState.CANCELING) {
int retry = 2000;
while (this.executionState.get() == ExecutionState.STARTING) {

if (this.groupVertex.getStageNumber() != this.executionGraph.getIndexOfCurrentExecutionStage()) {
// Set to canceled directly
updateExecutionState(ExecutionState.CANCELED, null);
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}
if (--retry == 0) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.ILLEGAL_STATE);
}

if (previousState == ExecutionState.FINISHED || previousState == ExecutionState.FAILED) {
// Ignore this call
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}
try {
Thread.sleep(1);
} catch (InterruptedException ie) {
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.ILLEGAL_STATE);
}

if (previousState != ExecutionState.RUNNING && previousState != ExecutionState.STARTING
&& previousState != ExecutionState.FINISHING && previousState != ExecutionState.REPLAYING) {
// Set to canceled directly
updateExecutionState(ExecutionState.CANCELED, null);
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}
}

if (this.allocatedResource == null) {
final TaskCancelResult result = new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.NO_INSTANCE);
result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
return result;
// The vertex state has changed, reevaluate what to do
continue;
}

try {
return this.allocatedResource.getInstance().cancelTask(this.vertexID);

} catch (IOException e) {
final TaskCancelResult result = new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.IPC_ERROR);
result.setDescription(StringUtils.stringifyException(e));
return result;
// Check if we had a race. If state change is accepted, send cancel request
if (compareAndUpdateExecutionState(previousState, ExecutionState.CANCELING)) {

if (this.groupVertex.getStageNumber() != this.executionGraph.getIndexOfCurrentExecutionStage()) {
// Set to canceled directly
updateExecutionState(ExecutionState.CANCELED, null);
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}

if (previousState != ExecutionState.RUNNING && previousState != ExecutionState.FINISHING
&& previousState != ExecutionState.REPLAYING) {
// Set to canceled directly
updateExecutionState(ExecutionState.CANCELED, null);
return new TaskCancelResult(getID(), AbstractTaskResult.ReturnCode.SUCCESS);
}

if (this.allocatedResource == null) {
final TaskCancelResult result = new TaskCancelResult(getID(),
AbstractTaskResult.ReturnCode.NO_INSTANCE);
result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
return result;
}

try {
return this.allocatedResource.getInstance().cancelTask(this.vertexID);

} catch (IOException e) {
final TaskCancelResult result = new TaskCancelResult(getID(),
AbstractTaskResult.ReturnCode.IPC_ERROR);
result.setDescription(StringUtils.stringifyException(e));
return result;
}
}
}

return new TaskCancelResult(getID(), ReturnCode.SUCCESS);
}

/**
Expand Down

0 comments on commit acca0ef

Please sign in to comment.