Skip to content

Commit

Permalink
Implement coarse-grained fault tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Nov 3, 2014
1 parent 2557832 commit dd687bc
Show file tree
Hide file tree
Showing 33 changed files with 828 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public JobGraph compileJobGraph(OptimizedPlan program) {

// create the jobgraph object
JobGraph graph = new JobGraph(program.getJobName());
graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
graph.setAllowQueuedScheduling(false);

// add vertices to the graph
Expand Down
30 changes: 30 additions & 0 deletions flink-core/src/main/java/org/apache/flink/api/common/Plan.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public class Plan implements Visitable<Operator<?>> {
* The default parallelism to use for nodes that have no explicitly specified parallelism.
*/
protected int defaultParallelism = DEFAULT_PARALELLISM;

/**
* The number of times failed tasks are re-executed.
*/
protected int numberOfExecutionRetries;

/**
* Hash map for files in the distributed cache: registered name to cache entry.
Expand Down Expand Up @@ -258,6 +263,31 @@ public void setDefaultParallelism(int defaultParallelism) {
this.defaultParallelism = defaultParallelism;
}

/**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of {@code -1} indicates that the system
* default value (as defined in the configuration) should be used.
*
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) {
throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numberOfExecutionRetries = numberOfExecutionRetries;
}

/**
* Gets the number of times the system will try to re-execute failed tasks. A value
* of {@code -1} indicates that the system default value (as defined in the configuration)
* should be used.
*
* @return The number of times the system will try to re-execute failed tasks.
*/
public int getNumberOfExecutionRetries() {
return numberOfExecutionRetries;
}

/**
* Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes
* for data types and is specific to a particular data model (record, tuple, Scala, ...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.configuration;

/**
Expand All @@ -36,6 +35,12 @@ public final class ConfigConstants {
*/
public static final String DEFAULT_PARALLELIZATION_DEGREE_KEY = "parallelization.degree.default";

/**
* Config parameter for the number of re-tries for failed tasks. Setting this
* value to 0 effectively disables fault tolerance.
*/
public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";

// -------------------------------- Runtime -------------------------------

/**
Expand Down Expand Up @@ -313,6 +318,11 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_PARALLELIZATION_DEGREE = 1;

/**
* The default number of execution retries.
*/
public static final int DEFAULT_EXECUTION_RETRIES = 0;

// ------------------------------ Runtime ---------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public abstract class ExecutionEnvironment {

private int degreeOfParallelism = -1;

private int numberOfExecutionRetries = -1;


// --------------------------------------------------------------------------------------------
// Constructor and Properties
Expand Down Expand Up @@ -143,6 +145,31 @@ public void setDegreeOfParallelism(int degreeOfParallelism) {
this.degreeOfParallelism = degreeOfParallelism;
}

/**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of {@code -1} indicates that the system
* default value (as defined in the configuration) should be used.
*
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) {
throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numberOfExecutionRetries = numberOfExecutionRetries;
}

/**
* Gets the number of times the system will try to re-execute failed tasks. A value
* of {@code -1} indicates that the system default value (as defined in the configuration)
* should be used.
*
* @return The number of times the system will try to re-execute failed tasks.
*/
public int getNumberOfExecutionRetries() {
return numberOfExecutionRetries;
}

/**
* Gets the UUID by which this environment is identified. The UUID sets the execution context
* in the cluster or local environment.
Expand Down Expand Up @@ -652,6 +679,7 @@ public JavaPlan createProgramPlan(String jobName) {
if (getDegreeOfParallelism() > 0) {
plan.setDefaultParallelism(getDegreeOfParallelism());
}
plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries);

try {
registerCachedFilesWithPlan(plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,13 @@ void markFinished() {

if (transitionState(current, FINISHED)) {
try {
vertex.executionFinished();
return;
assignedResource.releaseSlot();
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
vertex.getExecutionGraph().deregisterExecution(this);
assignedResource.releaseSlot();
vertex.executionFinished();
}
return;
}
}
else if (current == CANCELING) {
Expand Down Expand Up @@ -433,14 +433,14 @@ void cancelingComplete() {
if (current == CANCELED) {
return;
}
else if (current == CANCELING || current == RUNNING) {
else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
if (transitionState(current, CANCELED)) {
try {
vertex.executionCanceled();
assignedResource.releaseSlot();
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
vertex.getExecutionGraph().deregisterExecution(this);
assignedResource.releaseSlot();
vertex.executionCanceled();
}
return;
}
Expand Down Expand Up @@ -493,13 +493,13 @@ private boolean processFail(Throwable t, boolean isCallback) {
this.failureCause = t;

try {
vertex.getExecutionGraph().deregisterExecution(this);
vertex.executionFailed(t);
}
finally {
if (assignedResource != null) {
assignedResource.releaseSlot();
}
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
vertex.executionFailed(t);
}

if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public class ExecutionEdge {

private final int inputNum;

private final ChannelID inputChannelId;
private ChannelID inputChannelId;

private final ChannelID outputChannelId;
private ChannelID outputChannelId;


public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum) {
Expand All @@ -42,15 +42,6 @@ public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target,
this.outputChannelId = new ChannelID();
}

public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum, ChannelID inputChannelId, ChannelID outputChannelId) {
this.source = source;
this.target = target;
this.inputNum = inputNum;

this.inputChannelId = inputChannelId;
this.outputChannelId = outputChannelId;
}


public IntermediateResultPartition getSource() {
return source;
Expand All @@ -71,4 +62,9 @@ public ChannelID getInputChannelId() {
public ChannelID getOutputChannelId() {
return outputChannelId;
}

public void assignNewChannelIDs() {
inputChannelId = new ChannelID();
outputChannelId = new ChannelID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
Expand Down Expand Up @@ -106,6 +105,8 @@ public class ExecutionGraph {

private int nextVertexToFinish;

private int numberOfRetriesLeft;

private volatile JobStatus state = JobStatus.CREATED;

private volatile Throwable failureCause;
Expand Down Expand Up @@ -147,6 +148,17 @@ public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig,

// --------------------------------------------------------------------------------------------

public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
if (numberOfRetriesLeft < -1) {
throw new IllegalArgumentException();
}
this.numberOfRetriesLeft = numberOfRetriesLeft;
}

public int getNumberOfRetriesLeft() {
return numberOfRetriesLeft;
}

public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
Expand Down Expand Up @@ -344,8 +356,14 @@ else if (transitionState(current, JobStatus.FAILING, t)) {

public void waitForJobEnd(long timeout) throws InterruptedException {
synchronized (progressLock) {
while (nextVertexToFinish < verticesInCreationOrder.size()) {
progressLock.wait(timeout);

long now = System.currentTimeMillis();
long deadline = timeout == 0 ? Long.MAX_VALUE : now + timeout;


while (now < deadline && !state.isTerminalState()) {
progressLock.wait(deadline - now);
now = System.currentTimeMillis();
}
}
}
Expand Down Expand Up @@ -403,8 +421,21 @@ void jobVertexInFinalState(ExecutionJobVertex ev) {
if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
break;
}
if (current == JobStatus.FAILING && transitionState(current, JobStatus.FAILED, failureCause)) {
break;
if (current == JobStatus.FAILING) {
if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
numberOfRetriesLeft--;

execute(new Runnable() {
@Override
public void run() {
restart();
}
});
break;
}
else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
break;
}
}
if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
fail(new Exception("ExecutionGraph went into final state from state " + current));
Expand Down Expand Up @@ -659,4 +690,38 @@ public void execute(Runnable action) {
action.run();
}
}

public void restart() {
try {
if (state == JobStatus.FAILED) {
transitionState(JobStatus.FAILED, JobStatus.RESTARTING);
}
synchronized (progressLock) {
if (state != JobStatus.RESTARTING) {
throw new IllegalStateException("Can only restart job from state restarting.");
}
if (scheduler == null) {
throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null.");
}

this.currentExecutions.clear();
this.edges.clear();

for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
jv.resetForNewExecution();
}

for (int i = 0; i < stateTimestamps.length; i++) {
stateTimestamps[i] = 0;
}
nextVertexToFinish = 0;
transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
}

scheduleForExecution(scheduler);
}
catch (Throwable t) {
fail(t);
}
}
}
Loading

0 comments on commit dd687bc

Please sign in to comment.