Skip to content

Commit

Permalink
[FLINK-8466] [runtime] Make sure ErrorInfo references no user-defined…
Browse files Browse the repository at this point in the history
… classes.

That way, holding on to the ErrorInfo does not prevent class unloading.

However, this implies that the ErrorInfo must not hold strong references to any Exception classes.
For that reason, the commit pull the "ground truth" exception into a separate fields, so that the
ExecutionGraph logic itself can always assume to have the proper ground-truth exception.

This closes apache#5348
  • Loading branch information
StephanEwen committed Jan 24, 2018
1 parent 012413c commit 524c501
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public Integer getKey(Tuple2<Integer, Long> value) {
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);

String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
String failureCause = jobFound.executionGraph().getFailureInfo().getExceptionAsString();

assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState());
assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public interface AccessExecutionGraph {
* @return failure causing exception, or null
*/
@Nullable
ErrorInfo getFailureCause();
ErrorInfo getFailureInfo();

/**
* Returns the job vertex for the given {@link JobVertexID}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public JobStatus getState() {
}

@Override
public ErrorInfo getFailureCause() {
public ErrorInfo getFailureInfo() {
return failureCause;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
Expand All @@ -32,26 +30,25 @@ public class ErrorInfo implements Serializable {

private static final long serialVersionUID = -6138942031953594202L;

private final transient Throwable exception;
private final long timestamp;
/** The exception that we keep holding forever. Has no strong reference to any user-defined code. */
private final SerializedThrowable exception;

private volatile String exceptionAsString;
private final long timestamp;

public ErrorInfo(Throwable exception, long timestamp) {
Preconditions.checkNotNull(exception);
Preconditions.checkArgument(timestamp > 0);

this.exception = exception;
this.exception = exception instanceof SerializedThrowable ?
(SerializedThrowable) exception : new SerializedThrowable(exception);
this.timestamp = timestamp;
}

/**
* Returns the contained exception.
*
* @return contained exception, or {@code "(null)"} if either no exception was set or this object has been deserialized
* Returns the serialized form of the original exception.
*/
Throwable getException() {
return exception;
public SerializedThrowable getException() {
return this.exception;
}

/**
Expand All @@ -60,10 +57,7 @@ Throwable getException() {
* @return failure causing exception as a string, or {@code "(null)"}
*/
public String getExceptionAsString() {
if (exceptionAsString == null) {
exceptionAsString = ExceptionUtils.stringifyException(exception);
}
return exceptionAsString;
return exception.getFullStringifiedStackTrace();
}

/**
Expand All @@ -74,12 +68,4 @@ public String getExceptionAsString() {
public long getTimestamp() {
return timestamp;
}

private void writeObject(ObjectOutputStream out) throws IOException {
// make sure that the exception was stringified so it isn't lost during serialization
if (exceptionAsString == null) {
exceptionAsString = ExceptionUtils.stringifyException(exception);
}
out.defaultWriteObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive

/** The exception that caused the job to fail. This is set to the first root exception
* that was not recoverable and triggered job failure */
private volatile ErrorInfo failureCause;
private volatile Throwable failureCause;

/** The extended failure cause information for the job. This exists in addition to 'failureCause',
* to let 'failureCause' be a strong reference to the exception, while this info holds no
* strong reference to any user-defined classes.*/
private volatile ErrorInfo failureInfo;

// ------ Fields that are relevant to the execution and need to be cleared before archiving -------

Expand Down Expand Up @@ -619,10 +624,14 @@ public JobStatus getState() {
return state;
}

public ErrorInfo getFailureCause() {
public Throwable getFailureCause() {
return failureCause;
}

public ErrorInfo getFailureInfo() {
return failureInfo;
}

/**
* Gets the number of full restarts that the execution graph went through.
* If a full restart recovery is currently pending, this recovery is included in the
Expand Down Expand Up @@ -1034,33 +1043,14 @@ public void stop() throws StoppingException {
* @param suspensionCause Cause of the suspension
*/
public void suspend(Throwable suspensionCause) {
suspend(new ErrorInfo(suspensionCause, System.currentTimeMillis()));
}

/**
* Suspends the current ExecutionGraph.
*
* The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
* state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
*
* The SUSPENDED state is a local terminal state which stops the execution of the job but does
* not remove the job from the HA job store so that it can be recovered by another JobManager.
*
* @param errorInfo ErrorInfo containing the cause of the suspension
*/
public void suspend(ErrorInfo errorInfo) {
Throwable suspensionCause = errorInfo != null
? errorInfo.getException()
: null;

while (true) {
JobStatus currentState = state;

if (currentState.isTerminalState()) {
// stay in a terminal state
return;
} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
this.failureCause = errorInfo;
initFailureCause(suspensionCause);

// make sure no concurrent local actions interfere with the cancellation
incrementGlobalModVersion();
Expand All @@ -1080,10 +1070,6 @@ public void suspend(ErrorInfo errorInfo) {
}
}

public void failGlobal(Throwable error) {
failGlobal(new ErrorInfo(error, System.currentTimeMillis()));
}

/**
* Fails the execution graph globally. This failure will not be recovered by a specific
* failover strategy, but results in a full restart of all tasks.
Expand All @@ -1093,13 +1079,9 @@ public void failGlobal(Throwable error) {
* exceptions that indicate a bug or an unexpected call race), and where a full restart is the
* safe way to get consistency back.
*
* @param errorInfo ErrorInfo containing the exception that caused the failure.
* @param t The exception that caused the failure.
*/
public void failGlobal(ErrorInfo errorInfo) {
Throwable t = errorInfo != null
? errorInfo.getException()
: null;

public void failGlobal(Throwable t) {
while (true) {
JobStatus current = state;
// stay in these states
Expand All @@ -1111,15 +1093,15 @@ public void failGlobal(ErrorInfo errorInfo) {
else if (current == JobStatus.RESTARTING) {
// we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
// has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
this.failureCause = errorInfo;
initFailureCause(t);

final long globalVersionForRestart = incrementGlobalModVersion();
if (tryRestartOrFail(globalVersionForRestart)) {
return;
}
}
else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = errorInfo;
initFailureCause(t);

// make sure no concurrent local or global actions interfere with the failover
final long globalVersionForRestart = incrementGlobalModVersion();
Expand Down Expand Up @@ -1322,6 +1304,11 @@ private long incrementGlobalModVersion() {
return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
}

private void initFailureCause(Throwable t) {
this.failureCause = t;
this.failureInfo = new ErrorInfo(t, System.currentTimeMillis());
}

// ------------------------------------------------------------------------
// Job Status Progress
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1417,9 +1404,8 @@ private boolean tryRestartOrFail(long globalModVersionForRestart) {
JobStatus currentState = state;

if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
Throwable failureCause = this.failureCause != null
? this.failureCause.getException()
: null;
final Throwable failureCause = this.failureCause;

synchronized (progressLock) {
if (LOG.isDebugEnabled()) {
LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID(), failureCause);
Expand Down Expand Up @@ -1696,7 +1682,7 @@ void notifyExecutionChange(
catch (Throwable t) {
// bug in the failover strategy - fall back to global failover
LOG.warn("Error in failover strategy - falling back to global restart", t);
failGlobal(new ErrorInfo(ex, timestamp));
failGlobal(ex);
}
}
}
Expand Down Expand Up @@ -1728,7 +1714,7 @@ public ArchivedExecutionGraph archive() {
archivedVerticesInCreationOrder,
stateTimestamps,
getState(),
failureCause,
failureInfo,
getJsonPlan(),
getAccumulatorResultsStringified(),
serializedUserAccumulators,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public JobExceptionsHandler(

@Override
protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
ErrorInfo rootException = executionGraph.getFailureCause();
ErrorInfo rootException = executionGraph.getFailureInfo();
String rootExceptionMessage = null;
Long rootTimestamp = null;
if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static String createJobExceptionsJson(AccessExecutionGraph graph) throws
gen.writeStartObject();

// most important is the root failure cause
ErrorInfo rootException = graph.getFailureCause();
ErrorInfo rootException = graph.getFailureInfo();
if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
gen.writeStringField("root-exception", rootException.getExceptionAsString());
gen.writeNumberField("timestamp", rootException.getTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static void compareExecutionGraph(AccessExecutionGraph runtimeGraph, Acc
assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
assertEquals(runtimeGraph.getState(), archivedGraph.getState());
assertEquals(runtimeGraph.getFailureCause().getExceptionAsString(), archivedGraph.getFailureCause().getExceptionAsString());
assertEquals(runtimeGraph.getFailureInfo().getExceptionAsString(), archivedGraph.getFailureInfo().getExceptionAsString());
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), archivedGraph.getStatusTimestamp(JobStatus.CREATED));
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), archivedGraph.getStatusTimestamp(JobStatus.FAILING));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.core.testutils.CommonTestUtils;

import org.junit.Test;

import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;

import static org.junit.Assert.assertEquals;

/**
* Simple test for the {@link ErrorInfo}.
*/
public class ErrorInfoTest {

@Test
public void testSerializationWithExceptionOutsideClassLoader() throws Exception {
final ErrorInfo error = new ErrorInfo(new ExceptionWithCustomClassLoader(), System.currentTimeMillis());
final ErrorInfo copy = CommonTestUtils.createCopySerializable(error);

assertEquals(error.getTimestamp(), copy.getTimestamp());
assertEquals(error.getExceptionAsString(), copy.getExceptionAsString());
assertEquals(error.getException().getMessage(), copy.getException().getMessage());

}

// ------------------------------------------------------------------------

private static final class ExceptionWithCustomClassLoader extends Exception {

private static final long serialVersionUID = 42L;

private static final ClassLoader CUSTOM_LOADER = new URLClassLoader(new URL[0]);

@SuppressWarnings("unused")
private final Serializable outOfClassLoader = CommonTestUtils.createObjectForClassNotInClassPath(CUSTOM_LOADER);

public ExceptionWithCustomClassLoader() {
super("tada");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {

waitUntilJobStatus(eg, JobStatus.FAILED, 1000);

final Throwable t = eg.getFailureCause().getException();
final Throwable t = eg.getFailureCause();
if (!(t instanceof NoResourceAvailableException)) {
ExceptionUtils.rethrowException(t, t.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void testSuspendWhileRestarting() throws Exception {

assertEquals(JobStatus.SUSPENDED, eg.getState());

assertEquals(exception, eg.getFailureCause().getException());
assertEquals(exception, eg.getFailureCause());
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public void testJsonGeneration() throws Exception {
private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);

Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
Assert.assertEquals(originalJob.getFailureInfo().getExceptionAsString(), result.get("root-exception").asText());
Assert.assertEquals(originalJob.getFailureInfo().getTimestamp(), result.get("timestamp").asLong());

ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");

Expand Down

0 comments on commit 524c501

Please sign in to comment.