Skip to content

Commit

Permalink
[FLINK-34922][rest] Support concurrent global failure
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Mar 28, 2024
1 parent 83f82ab commit dc957bf
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,19 @@ private static JobExceptionsInfoWithHistory.RootExceptionInfo createRootExceptio

private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo(
ExceptionHistoryEntry exceptionHistoryEntry) {

if (exceptionHistoryEntry.isGlobal()) {
return new JobExceptionsInfoWithHistory.ExceptionInfo(
exceptionHistoryEntry.getException().getOriginalErrorClassName(),
exceptionHistoryEntry.getExceptionAsString(),
exceptionHistoryEntry.getTimestamp(),
exceptionHistoryEntry.getFailureLabels(),
null,
null,
null,
null);
}

assertLocalExceptionInfo(exceptionHistoryEntry);

return new JobExceptionsInfoWithHistory.ExceptionInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -216,6 +217,47 @@ void testWithExceptionHistory()
assertThat(response.getExceptionHistory().isTruncated()).isFalse();
}

@Test
void testWithExceptionHistoryAndConcurrentGlobalFailure()
throws HandlerRequestException, ExecutionException, InterruptedException {
final ExceptionHistoryEntry otherFailure =
ExceptionHistoryEntry.createGlobal(
new RuntimeException("exception #1"),
CompletableFuture.completedFuture(Collections.emptyMap()));
final RootExceptionHistoryEntry rootCause =
fromGlobalFailure(
new RuntimeException("exception #0"),
System.currentTimeMillis(),
Collections.singleton(otherFailure));

final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause);
final HandlerRequest<EmptyRequestBody> request =
createRequest(executionGraphInfo.getJobId(), 10);
final JobExceptionsInfoWithHistory response =
testInstance.handleRequest(request, executionGraphInfo);

assertThat(response.getExceptionHistory().getEntries())
.hasSize(1)
.satisfies(
matching(
contains(
historyContainsGlobalFailure(
rootCause.getException(),
rootCause.getTimestamp(),
matchesFailure(
otherFailure.getException(),
otherFailure.getTimestamp(),
otherFailure.getFailureLabelsFuture(),
otherFailure.getFailingTaskName(),
JobExceptionsHandler.toString(
otherFailure
.getTaskManagerLocation()),
JobExceptionsHandler.toTaskManagerId(
otherFailure
.getTaskManagerLocation()))))));
assertThat(response.getExceptionHistory().isTruncated()).isFalse();
}

@Test
void testWithExceptionHistoryWithMatchingFailureLabel()
throws HandlerRequestException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -542,13 +584,20 @@ private static HandlerRequest<EmptyRequestBody> createRequest(
}

private static RootExceptionHistoryEntry fromGlobalFailure(Throwable cause, long timestamp) {
return fromGlobalFailure(cause, timestamp, Collections.emptySet());
}

private static RootExceptionHistoryEntry fromGlobalFailure(
Throwable cause,
long timestamp,
Collection<ExceptionHistoryEntry> concurrentExceptions) {
return new RootExceptionHistoryEntry(
cause,
timestamp,
FailureEnricherUtils.EMPTY_FAILURE_LABELS,
null,
null,
Collections.emptySet());
concurrentExceptions);
}

// -------- factory methods for instantiating new Matchers --------
Expand Down

0 comments on commit dc957bf

Please sign in to comment.