Skip to content

Commit

Permalink
[FLINK-21189][runtime] Adds timestamp to FailureHandlingResult
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp authored and zentol committed Mar 31, 2021
1 parent 507ec83 commit 39c959c
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public void suspend(Throwable suspensionCause) {
// stay in a terminal state
return;
} else if (transitionState(state, JobStatus.SUSPENDED, suspensionCause)) {
initFailureCause(suspensionCause);
initFailureCause(suspensionCause, System.currentTimeMillis());

incrementRestarts();

Expand Down Expand Up @@ -1056,9 +1056,9 @@ public void incrementRestarts() {
}

@Override
public void initFailureCause(Throwable t) {
public void initFailureCause(Throwable t, long timestamp) {
this.failureCause = t;
this.failureInfo = new ErrorInfo(t, System.currentTimeMillis());
this.failureInfo = new ErrorInfo(t, timestamp);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1147,13 +1147,13 @@ private void allVerticesInTerminalState() {
}

@Override
public void failJob(Throwable cause) {
public void failJob(Throwable cause, long timestamp) {
if (state == JobStatus.FAILING || state.isTerminalState()) {
return;
}

transitionState(JobStatus.FAILING, cause);
initFailureCause(cause);
initFailureCause(cause, timestamp);

FutureUtils.assertNoException(
cancelVerticesAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void enableCheckpointing(
*/
void suspend(Throwable suspensionCause);

void failJob(Throwable cause);
void failJob(Throwable cause, long timestamp);

/**
* Returns the termination future of this {@link ExecutionGraph}. The termination future is
Expand All @@ -181,7 +181,7 @@ void enableCheckpointing(

void incrementRestarts();

void initFailureCause(Throwable t);
void initFailureCause(Throwable t, long timestamp);

/**
* Updates the state of one of the ExecutionVertex's Execution attempts. If the new status if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ public ExecutionFailureHandler(
*
* @param failedTask is the ID of the failed task vertex
* @param cause of the task failure
* @param timestamp of the task failure
* @return result of the failure handling
*/
public FailureHandlingResult getFailureHandlingResult(
ExecutionVertexID failedTask, Throwable cause) {
ExecutionVertexID failedTask, Throwable cause, long timestamp) {
return handleFailure(
failedTask,
cause,
timestamp,
failoverStrategy.getTasksNeedingRestart(failedTask, cause),
false);
}
Expand All @@ -91,12 +93,15 @@ public FailureHandlingResult getFailureHandlingResult(
* for it.
*
* @param cause of the task failure
* @param timestamp of the task failure
* @return result of the failure handling
*/
public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable cause) {
public FailureHandlingResult getGlobalFailureHandlingResult(
final Throwable cause, long timestamp) {
return handleFailure(
null,
cause,
timestamp,
IterableUtils.toStream(schedulingTopology.getVertices())
.map(SchedulingExecutionVertex::getId)
.collect(Collectors.toSet()),
Expand All @@ -106,13 +111,15 @@ public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable caus
private FailureHandlingResult handleFailure(
@Nullable final ExecutionVertexID failingExecutionVertexId,
final Throwable cause,
long timestamp,
final Set<ExecutionVertexID> verticesToRestart,
final boolean globalFailure) {

if (isUnrecoverableError(cause)) {
return FailureHandlingResult.unrecoverable(
failingExecutionVertexId,
new JobException("The failure is not recoverable", cause),
timestamp,
globalFailure);
}

Expand All @@ -123,6 +130,7 @@ private FailureHandlingResult handleFailure(
return FailureHandlingResult.restartable(
failingExecutionVertexId,
cause,
timestamp,
verticesToRestart,
restartBackoffTimeStrategy.getBackoffTime(),
globalFailure);
Expand All @@ -131,6 +139,7 @@ private FailureHandlingResult handleFailure(
failingExecutionVertexId,
new JobException(
"Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
timestamp,
globalFailure);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class FailureHandlingResult {
/** Failure reason. {@code @Nullable} because of FLINK-21376. */
@Nullable private final Throwable error;

/** Failure timestamp. */
private final long timestamp;

/** True if the original failure was a global failure. */
private final boolean globalFailure;

Expand All @@ -65,13 +68,15 @@ public class FailureHandlingResult {
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param cause the exception that caused this failure.
* @param timestamp the time the failure was handled.
* @param verticesToRestart containing task vertices to restart to recover from the failure.
* {@code null} indicates that the failure is not restartable.
* @param restartDelayMS indicate a delay before conducting the restart
*/
private FailureHandlingResult(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nullable Throwable cause,
long timestamp,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
Expand All @@ -81,6 +86,7 @@ private FailureHandlingResult(
this.restartDelayMS = restartDelayMS;
this.failingExecutionVertexId = failingExecutionVertexId;
this.error = cause;
this.timestamp = timestamp;
this.globalFailure = globalFailure;
}

Expand All @@ -91,15 +97,18 @@ private FailureHandlingResult(
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param error reason why the failure is not recoverable
* @param timestamp the time the failure was handled.
*/
private FailureHandlingResult(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nonnull Throwable error,
long timestamp,
boolean globalFailure) {
this.verticesToRestart = null;
this.restartDelayMS = -1;
this.failingExecutionVertexId = failingExecutionVertexId;
this.error = checkNotNull(error);
this.timestamp = timestamp;
this.globalFailure = globalFailure;
}

Expand Down Expand Up @@ -147,11 +156,19 @@ public Optional<ExecutionVertexID> getExecutionVertexIdOfFailedTask() {
*
* @return reason why the restarting cannot be conducted
*/
@Nullable
public Throwable getError() {
return error;
}

/**
* Returns the time of the failure.
*
* @return The timestamp.
*/
public long getTimestamp() {
return timestamp;
}

/**
* Returns whether the restarting can be conducted.
*
Expand All @@ -178,6 +195,8 @@ public boolean isGlobalFailure() {
* @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param cause The reason of the failure.
* @param timestamp The time of the failure.
* @param verticesToRestart containing task vertices to restart to recover from the failure.
* {@code null} indicates that the failure is not restartable.
* @param restartDelayMS indicate a delay before conducting the restart
Expand All @@ -186,11 +205,17 @@ public boolean isGlobalFailure() {
public static FailureHandlingResult restartable(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nonnull Throwable cause,
long timestamp,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
return new FailureHandlingResult(
failingExecutionVertexId, cause, verticesToRestart, restartDelayMS, globalFailure);
failingExecutionVertexId,
cause,
timestamp,
verticesToRestart,
restartDelayMS,
globalFailure);
}

/**
Expand All @@ -203,12 +228,14 @@ public static FailureHandlingResult restartable(
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param error reason why the failure is not recoverable
* @param timestamp The time of the failure.
* @return result indicating the failure is not recoverable
*/
public static FailureHandlingResult unrecoverable(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nonnull Throwable error,
long timestamp,
boolean globalFailure) {
return new FailureHandlingResult(failingExecutionVertexId, error, globalFailure);
return new FailureHandlingResult(failingExecutionVertexId, error, timestamp, globalFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,12 @@ private void maybeHandleTaskFailure(

private void handleTaskFailure(
final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
setGlobalFailureCause(error);
final long timestamp = System.currentTimeMillis();
setGlobalFailureCause(error, timestamp);
notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
final FailureHandlingResult failureHandlingResult =
executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
executionFailureHandler.getFailureHandlingResult(
executionVertexId, error, timestamp);
maybeRestartTasks(failureHandlingResult);
}

Expand All @@ -217,19 +219,20 @@ private void notifyCoordinatorsAboutTaskFailure(

@Override
public void handleGlobalFailure(final Throwable error) {
setGlobalFailureCause(error);
final long timestamp = System.currentTimeMillis();
setGlobalFailureCause(error, timestamp);

log.info("Trying to recover from a global failure.", error);
final FailureHandlingResult failureHandlingResult =
executionFailureHandler.getGlobalFailureHandlingResult(error);
executionFailureHandler.getGlobalFailureHandlingResult(error, timestamp);
maybeRestartTasks(failureHandlingResult);
}

private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
if (failureHandlingResult.canRestart()) {
restartTasksWithDelay(failureHandlingResult);
} else {
failJob(failureHandlingResult.getError());
failJob(failureHandlingResult.getError(), failureHandlingResult.getTimestamp());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,19 +462,19 @@ protected void transitionToScheduled(final List<ExecutionVertexID> verticesToDep
.transitionState(ExecutionState.SCHEDULED));
}

protected void setGlobalFailureCause(@Nullable final Throwable cause) {
protected void setGlobalFailureCause(@Nullable final Throwable cause, long timestamp) {
if (cause != null) {
executionGraph.initFailureCause(cause);
executionGraph.initFailureCause(cause, timestamp);
}
}

protected ComponentMainThreadExecutor getMainThreadExecutor() {
return mainThreadExecutor;
}

protected void failJob(Throwable cause) {
protected void failJob(Throwable cause, long timestamp) {
incrementVersionsOfAllVertices();
executionGraph.failJob(cause);
executionGraph.failJob(cause, timestamp);
getJobTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));
}

Expand Down Expand Up @@ -629,9 +629,8 @@ protected final void archiveFromFailureHandlingResult(
executionVertexId,
rootEntry.getExceptionAsString());
} else {
// fallback in case of a global fail over - no failed state is set and, therefore, no
// timestamp was taken
archiveGlobalFailure(failureHandlingResult.getError(), System.currentTimeMillis());
archiveGlobalFailure(
failureHandlingResult.getError(), failureHandlingResult.getTimestamp());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Failing extends StateWithExecutionGraph {
super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
this.context = context;

getExecutionGraph().failJob(failureCause);
getExecutionGraph().failJob(failureCause, System.currentTimeMillis());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testShutdownCheckpointCoordinatorOnFailure() throws Exception {
assertThat(checkpointCoordinator, Matchers.notNullValue());
assertThat(checkpointCoordinator.isShutdown(), is(false));

graph.failJob(new Exception("Test Exception"));
graph.failJob(new Exception("Test Exception"), System.currentTimeMillis());

assertThat(checkpointCoordinator.isShutdown(), is(true));
assertThat(counterShutdownFuture.get(), is(JobStatus.FAILED));
Expand Down
Loading

0 comments on commit 39c959c

Please sign in to comment.