Skip to content

Commit

Permalink
[FLINK-21189][runtime] Introduces ExceptionHistoryEntryExtractor
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp authored and zentol committed Mar 31, 2021
1 parent e10a723 commit 507ec83
Show file tree
Hide file tree
Showing 15 changed files with 1,106 additions and 431 deletions.
26 changes: 25 additions & 1 deletion flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:RootExceptionInfo",
"properties" : {
"exceptionName" : {
"type" : "string"
Expand All @@ -1628,6 +1628,30 @@
},
"location" : {
"type" : "string"
},
"concurrentExceptions" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo",
"properties" : {
"exceptionName" : {
"type" : "string"
},
"stacktrace" : {
"type" : "string"
},
"timestamp" : {
"type" : "integer"
},
"taskName" : {
"type" : "string"
},
"location" : {
"type" : "string"
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
Expand All @@ -56,6 +57,7 @@
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/** Handler serving the job exceptions. */
public class JobExceptionsHandler
Expand Down Expand Up @@ -153,46 +155,69 @@ private static JobExceptionsInfoWithHistory createJobExceptionsInfo(
executionGraphInfo.getExceptionHistory(), exceptionToReportMaxSize));
}

static JobExceptionsInfoWithHistory.JobExceptionHistory createJobExceptionHistory(
Iterable<ExceptionHistoryEntry> historyEntries, int limit) {
private static JobExceptionsInfoWithHistory.JobExceptionHistory createJobExceptionHistory(
Iterable<RootExceptionHistoryEntry> historyEntries, int limit) {
// we need to reverse the history to have a stable result when doing paging on it
final List<ExceptionHistoryEntry> reversedHistoryEntries = new ArrayList<>();
final List<RootExceptionHistoryEntry> reversedHistoryEntries = new ArrayList<>();
Iterables.addAll(reversedHistoryEntries, historyEntries);
Collections.reverse(reversedHistoryEntries);

List<JobExceptionsInfoWithHistory.ExceptionInfo> exceptionHistoryEntries =
List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptionHistoryEntries =
reversedHistoryEntries.stream()
.limit(limit)
.map(JobExceptionsHandler::createExceptionInfo)
.map(JobExceptionsHandler::createRootExceptionInfo)
.collect(Collectors.toList());

return new JobExceptionsInfoWithHistory.JobExceptionHistory(
exceptionHistoryEntries,
exceptionHistoryEntries.size() < reversedHistoryEntries.size());
}

private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo(
ExceptionHistoryEntry historyEntry) {
private static JobExceptionsInfoWithHistory.RootExceptionInfo createRootExceptionInfo(
RootExceptionHistoryEntry historyEntry) {
final List<JobExceptionsInfoWithHistory.ExceptionInfo> concurrentExceptions =
StreamSupport.stream(historyEntry.getConcurrentExceptions().spliterator(), false)
.map(JobExceptionsHandler::createExceptionInfo)
.collect(Collectors.toList());

if (historyEntry.isGlobal()) {
return new JobExceptionsInfoWithHistory.ExceptionInfo(
return new JobExceptionsInfoWithHistory.RootExceptionInfo(
historyEntry.getException().getOriginalErrorClassName(),
historyEntry.getExceptionAsString(),
historyEntry.getTimestamp());
historyEntry.getTimestamp(),
concurrentExceptions);
}

Preconditions.checkArgument(
historyEntry.getFailingTaskName() != null,
"The taskName must not be null for a non-global failure.");
Preconditions.checkArgument(
historyEntry.getTaskManagerLocation() != null,
"The location must not be null for a non-global failure.");
assertLocalExceptionInfo(historyEntry);

return new JobExceptionsInfoWithHistory.ExceptionInfo(
return new JobExceptionsInfoWithHistory.RootExceptionInfo(
historyEntry.getException().getOriginalErrorClassName(),
historyEntry.getExceptionAsString(),
historyEntry.getTimestamp(),
historyEntry.getFailingTaskName(),
toString(historyEntry.getTaskManagerLocation()));
toString(historyEntry.getTaskManagerLocation()),
concurrentExceptions);
}

private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo(
ExceptionHistoryEntry exceptionHistoryEntry) {
assertLocalExceptionInfo(exceptionHistoryEntry);

return new JobExceptionsInfoWithHistory.ExceptionInfo(
exceptionHistoryEntry.getException().getOriginalErrorClassName(),
exceptionHistoryEntry.getExceptionAsString(),
exceptionHistoryEntry.getTimestamp(),
exceptionHistoryEntry.getFailingTaskName(),
toString(exceptionHistoryEntry.getTaskManagerLocation()));
}

private static void assertLocalExceptionInfo(ExceptionHistoryEntry exceptionHistoryEntry) {
Preconditions.checkArgument(
exceptionHistoryEntry.getFailingTaskName() != null,
"The taskName must not be null for a non-global failure.");
Preconditions.checkArgument(
exceptionHistoryEntry.getTaskManagerLocation() != null,
"The location must not be null for a non-global failure.");
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -117,21 +118,21 @@ public static final class JobExceptionHistory {
public static final String FIELD_NAME_TRUNCATED = "truncated";

@JsonProperty(FIELD_NAME_ENTRIES)
private final List<ExceptionInfo> entries;
private final List<RootExceptionInfo> entries;

@JsonProperty(FIELD_NAME_TRUNCATED)
private final boolean truncated;

@JsonCreator
public JobExceptionHistory(
@JsonProperty(FIELD_NAME_ENTRIES) List<ExceptionInfo> entries,
@JsonProperty(FIELD_NAME_ENTRIES) List<RootExceptionInfo> entries,
@JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
this.entries = entries;
this.truncated = truncated;
}

@JsonIgnore
public List<ExceptionInfo> getEntries() {
public List<RootExceptionInfo> getEntries() {
return entries;
}

Expand Down Expand Up @@ -169,7 +170,10 @@ public String toString() {
}
}

/** Collects the information of a single exception. */
/**
* Json equivalent of {@link
* org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry}.
*/
public static class ExceptionInfo {

public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName";
Expand Down Expand Up @@ -276,4 +280,73 @@ public String toString() {
.toString();
}
}

/**
* Json equivalent of {@link
* org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry}.
*/
public static class RootExceptionInfo extends ExceptionInfo {

public static final String FIELD_NAME_CONCURRENT_EXCEPTIONS = "concurrentExceptions";

@JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS)
private final Collection<ExceptionInfo> concurrentExceptions;

public RootExceptionInfo(
String exceptionName,
String stacktrace,
long timestamp,
Collection<ExceptionInfo> concurrentExceptions) {
this(exceptionName, stacktrace, timestamp, null, null, concurrentExceptions);
}

@JsonCreator
public RootExceptionInfo(
@JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
@JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace,
@JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
@JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
@JsonProperty(FIELD_NAME_LOCATION) @Nullable String location,
@JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS)
Collection<ExceptionInfo> concurrentExceptions) {
super(exceptionName, stacktrace, timestamp, taskName, location);
this.concurrentExceptions = concurrentExceptions;
}

@JsonIgnore
public Collection<ExceptionInfo> getConcurrentExceptions() {
return concurrentExceptions;
}

// hashCode and equals are necessary for the test classes deriving from
// RestResponseMarshallingTestBase
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass() || !super.equals(o)) {
return false;
}
RootExceptionInfo that = (RootExceptionInfo) o;
return getConcurrentExceptions().equals(that.getConcurrentExceptions());
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), getConcurrentExceptions());
}

@Override
public String toString() {
return new StringJoiner(", ", RootExceptionInfo.class.getSimpleName() + "[", "]")
.add("exceptionName='" + getExceptionName() + "'")
.add("stacktrace='" + getStacktrace() + "'")
.add("timestamp=" + getTimestamp())
.add("taskName='" + getTaskName() + "'")
.add("location='" + getLocation() + "'")
.add("concurrentExceptions=" + getConcurrentExceptions())
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;

import java.io.Serializable;
import java.util.Collections;
Expand All @@ -34,15 +35,15 @@ public class ExecutionGraphInfo implements Serializable {
private static final long serialVersionUID = -6134203195124124202L;

private final ArchivedExecutionGraph executionGraph;
private final Iterable<ExceptionHistoryEntry> exceptionHistory;
private final Iterable<RootExceptionHistoryEntry> exceptionHistory;

public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
this(executionGraph, Collections.emptyList());
}

public ExecutionGraphInfo(
ArchivedExecutionGraph executionGraph,
Iterable<ExceptionHistoryEntry> exceptionHistory) {
Iterable<RootExceptionHistoryEntry> exceptionHistory) {
this.executionGraph = executionGraph;
this.exceptionHistory = exceptionHistory;
}
Expand All @@ -55,7 +56,7 @@ public ArchivedExecutionGraph getArchivedExecutionGraph() {
return executionGraph;
}

public Iterable<ExceptionHistoryEntry> getExceptionHistory() {
public Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
return exceptionHistory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryExtractor;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
Expand Down Expand Up @@ -145,7 +147,8 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling

private final ComponentMainThreadExecutor mainThreadExecutor;

private final BoundedFIFOQueue<ExceptionHistoryEntry> exceptionHistory;
private final ExceptionHistoryEntryExtractor exceptionHistoryEntryExtractor;
private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;

private final ExecutionGraphFactory executionGraphFactory;

Expand Down Expand Up @@ -210,7 +213,9 @@ public SchedulerBase(
this.operatorCoordinatorHandler =
new DefaultOperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure);
operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);
exceptionHistory =

this.exceptionHistoryEntryExtractor = new ExceptionHistoryEntryExtractor();
this.exceptionHistory =
new BoundedFIFOQueue<>(
jobMasterConfiguration.getInteger(
JobManagerOptions.MAX_EXCEPTION_HISTORY_SIZE));
Expand Down Expand Up @@ -598,30 +603,31 @@ protected final void archiveGlobalFailure(@Nullable Throwable failure) {
archiveGlobalFailure(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED));
}

protected final void archiveGlobalFailure(@Nullable Throwable failure, long timestamp) {
exceptionHistory.add(ExceptionHistoryEntry.fromGlobalFailure(failure, timestamp));
private void archiveGlobalFailure(@Nullable Throwable failure, long timestamp) {
exceptionHistory.add(
exceptionHistoryEntryExtractor.extractGlobalFailure(
executionGraph.getAllExecutionVertices(), failure, timestamp));
log.debug("Archive global failure.", failure);
}

protected final void archiveFromFailureHandlingResult(
FailureHandlingResult failureHandlingResult) {
final Optional<Execution> executionOptional =
failureHandlingResult
.getExecutionVertexIdOfFailedTask()
.map(this::getExecutionVertex)
.map(ExecutionVertex::getCurrentExecutionAttempt);

if (executionOptional.isPresent()) {
final Execution failedExecution = executionOptional.get();
final ExceptionHistoryEntry exceptionHistoryEntry =
ExceptionHistoryEntry.fromFailedExecution(
failedExecution,
failedExecution.getVertex().getTaskNameWithSubtaskIndex());
exceptionHistory.add(exceptionHistoryEntry);
if (failureHandlingResult.getExecutionVertexIdOfFailedTask().isPresent()) {
final ExecutionVertexID executionVertexId =
failureHandlingResult.getExecutionVertexIdOfFailedTask().get();
final RootExceptionHistoryEntry rootEntry =
exceptionHistoryEntryExtractor.extractLocalFailure(
executionGraph.getAllVertices(),
executionVertexId,
failureHandlingResult.getVerticesToRestart().stream()
.filter(v -> !executionVertexId.equals(v))
.collect(Collectors.toSet()));
exceptionHistory.add(rootEntry);

log.debug(
"Archive local failure causing attempt {} to fail: {}",
failedExecution.getAttemptId(),
exceptionHistoryEntry.getExceptionAsString());
executionVertexId,
rootEntry.getExceptionAsString());
} else {
// fallback in case of a global fail over - no failed state is set and, therefore, no
// timestamp was taken
Expand Down Expand Up @@ -709,14 +715,9 @@ public final void notifyPartitionDataAvailable(final ResultPartitionID partition
protected void notifyPartitionDataAvailableInternal(
IntermediateResultPartitionID resultPartitionId) {}

/**
* Returns a copy of the current history of task failures.
*
* @return a copy of the current history of task failures.
*/
@VisibleForTesting
protected Iterable<ExceptionHistoryEntry> getExceptionHistory() {
final Collection<ExceptionHistoryEntry> copy = new ArrayList<>(exceptionHistory.size());
Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
final Collection<RootExceptionHistoryEntry> copy = new ArrayList<>(exceptionHistory.size());
exceptionHistory.forEach(copy::add);

return copy;
Expand Down
Loading

0 comments on commit 507ec83

Please sign in to comment.