diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html index 707596ce36592..8a5b5867a7293 100644 --- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html @@ -26,6 +26,12 @@ String Dictionary for JobManager to store the archives of completed jobs. + +
jobmanager.exception-history-size
+ 16 + Integer + The maximum number of failures collected by the exception history per job. +
jobmanager.execution.attempts-history-size
16 diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html b/docs/layouts/shortcodes/generated/job_manager_configuration.html index ba83988e1c84f..b0e7e7ec51b0f 100644 --- a/docs/layouts/shortcodes/generated/job_manager_configuration.html +++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html @@ -32,6 +32,12 @@ String The local address of the network interface that the job manager binds to. If not configured, '0.0.0.0' will be used. + +
jobmanager.exception-history-size
+ 16 + Integer + The maximum number of failures collected by the exception history per job. +
jobmanager.execution.attempts-history-size
16 diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 82e0f0b86dd03..181dd68369c7c 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -2630,7 +2630,7 @@ Response code: 200 OK - Returns the non-recoverable exceptions that have been observed by the job. The truncated flag defines whether more exceptions occurred, but are not listed, because the response would otherwise get too big. + Returns the most recent exceptions that have been handled by Flink for this job. The 'exceptionHistory.truncated' flag defines whether exceptions were filtered out through the GET parameter. The backend collects only a specific amount of most recent exceptions per job. This can be configured through jobmanager.exception-history-size in the Flink configuration. The following first-level members are deprecated: 'root-exception', 'timestamp', 'timestamp', 'truncated'. Use the data provided through 'exceptionHistory', instead. Path parameters @@ -2685,7 +2685,7 @@ { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory", "properties" : { "all-exceptions" : { "type" : "array", @@ -2708,6 +2708,39 @@ } } }, + "exceptionHistory" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory", + "properties" : { + "entries" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo", + "properties" : { + "exceptionName" : { + "type" : "string" + }, + "location" : { + "type" : "string" + }, + "stacktrace" : { + "type" : "string" + }, + "taskName" : { + "type" : "string" + }, + "timestamp" : { + "type" : "integer" + } + } + } + }, + "truncated" : { + "type" : "boolean" + } + } + }, "root-exception" : { "type" : "string" }, diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index d81f018a54250..04387305f75ac 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -257,6 +257,15 @@ public class JobManagerOptions { .withDescription( "The maximum number of prior execution attempts kept in history."); + /** The maximum number of failures kept in the exception history. */ + @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER) + public static final ConfigOption MAX_EXCEPTION_HISTORY_SIZE = + key("jobmanager.exception-history-size") + .intType() + .defaultValue(16) + .withDescription( + "The maximum number of failures collected by the exception history per job."); + /** * This option specifies the failover strategy, i.e. how the job computation recovers from task * failures. diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index ae96ce334f29d..847949e799957 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1572,7 +1572,7 @@ }, "response" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfo", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory", "properties" : { "root-exception" : { "type" : "string" @@ -1603,6 +1603,39 @@ }, "truncated" : { "type" : "boolean" + }, + "exceptionHistory" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory", + "properties" : { + "entries" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo", + "properties" : { + "exceptionName" : { + "type" : "string" + }, + "stacktrace" : { + "type" : "string" + }, + "timestamp" : { + "type" : "integer" + }, + "taskName" : { + "type" : "string" + }, + "location" : { + "type" : "string" + } + } + } + }, + "truncated" : { + "type" : "boolean" + } + } } } } diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-exception.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-exception.ts index 83d831b62d6bb..79a86c31e705d 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-exception.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-exception.ts @@ -21,6 +21,7 @@ export interface JobExceptionInterface { timestamp: number; truncated: boolean; 'all-exceptions': JobExceptionItemInterface[]; + 'exceptionHistory': JobExceptionHistoryInterface; } export interface JobExceptionItemInterface { @@ -32,3 +33,16 @@ export interface JobExceptionItemInterface { timestamp: number; 'vertex-id': string; } + +export interface JobExceptionHistoryInterface { + entries: ExceptionInfoInterface[]; + truncated: boolean; +} + +export interface ExceptionInfoInterface { + exceptionName: string; + stacktrace: string; + timestamp: number; + taskName: string; + location: string; +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html index ea58a7f3f0c23..8da139f2b686d 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.html @@ -31,6 +31,7 @@ Time + Exception Name Location @@ -40,21 +41,28 @@ {{exception.timestamp | date:'yyyy-MM-dd HH:mm:ss'}} +
{{exception.exceptionName}}
- {{exception.task}} + {{exception.taskName || "(global failure)"}}
- {{exception.location}} + {{exception.location || "(unassigned)"}} - - + + - + +   + The exception history is limited to the most recent failures that caused parts of the job or the entire job to restart. The maximum history size can be configured through the Flink configuration. + + + + diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.ts index cb726044069c7..0513993390e8a 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/exceptions/job-exceptions.component.ts @@ -18,7 +18,7 @@ import { formatDate } from '@angular/common'; import { Component, OnInit, ChangeDetectionStrategy, ChangeDetectorRef } from '@angular/core'; -import { JobExceptionItemInterface } from 'interfaces'; +import { ExceptionInfoInterface } from 'interfaces'; import { distinctUntilChanged, flatMap, tap } from 'rxjs/operators'; import { JobService } from 'services'; @@ -30,12 +30,13 @@ import { JobService } from 'services'; }) export class JobExceptionsComponent implements OnInit { rootException = ''; - listOfException: JobExceptionItemInterface[] = []; + listOfException: ExceptionInfoInterface[] = []; truncated = false; isLoading = false; maxExceptions = 0; + total = 0; - trackExceptionBy(_: number, node: JobExceptionItemInterface) { + trackExceptionBy(_: number, node: ExceptionInfoInterface) { return node.timestamp; } loadMore() { @@ -52,13 +53,15 @@ export class JobExceptionsComponent implements OnInit { ) .subscribe(data => { // @ts-ignore - if (data['root-exception']) { - this.rootException = formatDate(data.timestamp, 'yyyy-MM-dd HH:mm:ss', 'en') + '\n' + data['root-exception']; + var exceptionHistory = data.exceptionHistory + if (exceptionHistory.entries.length > 0) { + var mostRecentException = exceptionHistory.entries[0] + this.rootException = formatDate(mostRecentException.timestamp, 'yyyy-MM-dd HH:mm:ss', 'en') + '\n' + mostRecentException.stacktrace; } else { this.rootException = 'No Root Exception'; } - this.truncated = data.truncated; - this.listOfException = data['all-exceptions']; + this.truncated = exceptionHistory.truncated; + this.listOfException = exceptionHistory.entries; }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index bb817dfa29bb4..cf9a3cdf50a55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -18,25 +18,34 @@ package org.apache.flink.runtime.rest.handler.job; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsInfo; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter; +import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; -import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.curator4.com.google.common.collect.Iterables; + +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; @@ -46,12 +55,13 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** Handler serving the job exceptions. */ public class JobExceptionsHandler - extends AbstractAccessExecutionGraphHandler< - JobExceptionsInfo, JobExceptionsMessageParameters> - implements OnlyExecutionGraphJsonArchivist { + extends AbstractExecutionGraphHandler< + JobExceptionsInfoWithHistory, JobExceptionsMessageParameters> + implements JsonArchivist { static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; @@ -59,7 +69,10 @@ public JobExceptionsHandler( GatewayRetriever leaderRetriever, Time timeout, Map responseHeaders, - MessageHeaders + MessageHeaders< + EmptyRequestBody, + JobExceptionsInfoWithHistory, + JobExceptionsMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { @@ -74,9 +87,9 @@ public JobExceptionsHandler( } @Override - protected JobExceptionsInfo handleRequest( + protected JobExceptionsInfoWithHistory handleRequest( HandlerRequest request, - AccessExecutionGraph executionGraph) { + ExecutionGraphInfo executionGraph) { List exceptionToReportMaxSizes = request.getQueryParameter(UpperLimitExceptionParameter.class); final int exceptionToReportMaxSize = @@ -87,24 +100,25 @@ protected JobExceptionsInfo handleRequest( } @Override - public Collection archiveJsonWithPath(AccessExecutionGraph graph) + public Collection archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo) throws IOException { - ResponseBody json = createJobExceptionsInfo(graph, MAX_NUMBER_EXCEPTION_TO_REPORT); + ResponseBody json = + createJobExceptionsInfo(executionGraphInfo, MAX_NUMBER_EXCEPTION_TO_REPORT); String path = getMessageHeaders() .getTargetRestEndpointURL() - .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + .replace( + ':' + JobIDPathParameter.KEY, + executionGraphInfo.getJobId().toString()); return Collections.singletonList(new ArchivedJson(path, json)); } - private static JobExceptionsInfo createJobExceptionsInfo( - AccessExecutionGraph executionGraph, int exceptionToReportMaxSize) { - ErrorInfo rootException = executionGraph.getFailureInfo(); - String rootExceptionMessage = null; - Long rootTimestamp = null; - if (rootException != null) { - rootExceptionMessage = rootException.getExceptionAsString(); - rootTimestamp = rootException.getTimestamp(); + private static JobExceptionsInfoWithHistory createJobExceptionsInfo( + ExecutionGraphInfo executionGraphInfo, int exceptionToReportMaxSize) { + final ArchivedExecutionGraph executionGraph = + executionGraphInfo.getArchivedExecutionGraph(); + if (executionGraph.getFailureInfo() == null) { + return new JobExceptionsInfoWithHistory(); } List taskExceptionList = new ArrayList<>(); @@ -118,10 +132,7 @@ private static JobExceptionsInfo createJobExceptionsInfo( } TaskManagerLocation location = task.getCurrentAssignedResourceLocation(); - String locationString = - location != null - ? location.getFQDNHostname() + ':' + location.dataPort() - : "(unassigned)"; + String locationString = toString(location); long timestamp = task.getStateTimestamp(ExecutionState.FAILED); taskExceptionList.add( new JobExceptionsInfo.ExecutionExceptionInfo( @@ -132,7 +143,76 @@ private static JobExceptionsInfo createJobExceptionsInfo( } } - return new JobExceptionsInfo( - rootExceptionMessage, rootTimestamp, taskExceptionList, truncated); + final ErrorInfo rootCause = executionGraph.getFailureInfo(); + return new JobExceptionsInfoWithHistory( + rootCause.getExceptionAsString(), + rootCause.getTimestamp(), + taskExceptionList, + truncated, + createJobExceptionHistory( + executionGraphInfo.getExceptionHistory(), exceptionToReportMaxSize)); + } + + static JobExceptionsInfoWithHistory.JobExceptionHistory createJobExceptionHistory( + Iterable historyEntries, int limit) { + // we need to reverse the history to have a stable result when doing paging on it + final List reversedHistoryEntries = new ArrayList<>(); + Iterables.addAll(reversedHistoryEntries, historyEntries); + Collections.reverse(reversedHistoryEntries); + + List exceptionHistoryEntries = + reversedHistoryEntries.stream() + .limit(limit) + .map(JobExceptionsHandler::createExceptionInfo) + .collect(Collectors.toList()); + + return new JobExceptionsInfoWithHistory.JobExceptionHistory( + exceptionHistoryEntries, + exceptionHistoryEntries.size() < reversedHistoryEntries.size()); + } + + private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo( + ExceptionHistoryEntry historyEntry) { + if (historyEntry.isGlobal()) { + return new JobExceptionsInfoWithHistory.ExceptionInfo( + historyEntry.getException().getOriginalErrorClassName(), + historyEntry.getExceptionAsString(), + historyEntry.getTimestamp()); + } + + Preconditions.checkArgument( + historyEntry.getFailingTaskName() != null, + "The taskName must not be null for a non-global failure."); + Preconditions.checkArgument( + historyEntry.getTaskManagerLocation() != null, + "The location must not be null for a non-global failure."); + + return new JobExceptionsInfoWithHistory.ExceptionInfo( + historyEntry.getException().getOriginalErrorClassName(), + historyEntry.getExceptionAsString(), + historyEntry.getTimestamp(), + historyEntry.getFailingTaskName(), + toString(historyEntry.getTaskManagerLocation())); + } + + @VisibleForTesting + static String toString(@Nullable TaskManagerLocation location) { + // '(unassigned)' being the default value is added to support backward-compatibility for the + // deprecated fields + return location != null + ? taskManagerLocationToString(location.getFQDNHostname(), location.dataPort()) + : "(unassigned)"; + } + + @VisibleForTesting + @Nullable + static String toString(@Nullable ExceptionHistoryEntry.ArchivedTaskManagerLocation location) { + return location != null + ? taskManagerLocationToString(location.getFQDNHostname(), location.getPort()) + : null; + } + + private static String taskManagerLocationToString(String fqdnHostname, int port) { + return String.format("%s:%d", fqdnHostname, port); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java index e6b8018f2d360..364809be7da84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rest.messages; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; @@ -27,7 +28,7 @@ /** Message headers for the {@link JobExceptionsHandler}. */ public class JobExceptionsHeaders implements MessageHeaders< - EmptyRequestBody, JobExceptionsInfo, JobExceptionsMessageParameters> { + EmptyRequestBody, JobExceptionsInfoWithHistory, JobExceptionsMessageParameters> { private static final JobExceptionsHeaders INSTANCE = new JobExceptionsHeaders(); @@ -41,8 +42,8 @@ public Class getRequestClass() { } @Override - public Class getResponseClass() { - return JobExceptionsInfo.class; + public Class getResponseClass() { + return JobExceptionsInfoWithHistory.class; } @Override @@ -71,7 +72,14 @@ public static JobExceptionsHeaders getInstance() { @Override public String getDescription() { - return "Returns the non-recoverable exceptions that have been observed by the job. The truncated flag defines " - + "whether more exceptions occurred, but are not listed, because the response would otherwise get too big."; + return String.format( + "Returns the most recent exceptions that have been handled by Flink for this job. The " + + "'exceptionHistory.truncated' flag defines whether exceptions were filtered " + + "out through the GET parameter. The backend collects only a specific amount " + + "of most recent exceptions per job. This can be configured through %s in the " + + "Flink configuration. The following first-level members are deprecated: " + + "'root-exception', 'timestamp', 'timestamp', 'truncated'. Use the data provided " + + "through 'exceptionHistory', instead.", + JobManagerOptions.MAX_EXCEPTION_HISTORY_SIZE.key()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java index 7450390c4c83e..2defb019b1181 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.rest.messages; -import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -27,24 +26,44 @@ import java.util.List; import java.util.Objects; +import java.util.StringJoiner; -/** Response type of the {@link JobExceptionsHandler}. */ -public class JobExceptionsInfo implements ResponseBody { +/** + * {@code JobExceptionInfo} holds the information for single failure which caused a (maybe partial) + * job restart. + */ +public class JobExceptionsInfo { public static final String FIELD_NAME_ROOT_EXCEPTION = "root-exception"; public static final String FIELD_NAME_TIMESTAMP = "timestamp"; public static final String FIELD_NAME_ALL_EXCEPTIONS = "all-exceptions"; public static final String FIELD_NAME_TRUNCATED = "truncated"; + /** + * @deprecated Use {@link JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead. + */ + @Deprecated @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) private final String rootException; + /** + * @deprecated Use {@link JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead. + */ + @Deprecated @JsonProperty(FIELD_NAME_TIMESTAMP) private final Long rootTimestamp; + /** + * @deprecated Use {@link JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead. + */ + @Deprecated @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) private final List allExceptions; + /** + * @deprecated Use {@link JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead. + */ + @Deprecated @JsonProperty(FIELD_NAME_TRUNCATED) private final boolean truncated; @@ -80,6 +99,16 @@ public int hashCode() { return Objects.hash(rootException, rootTimestamp, allExceptions, truncated); } + @Override + public String toString() { + return new StringJoiner(", ", JobExceptionsInfo.class.getSimpleName() + "[", "]") + .add("rootException='" + rootException + "'") + .add("rootTimestamp=" + rootTimestamp) + .add("allExceptions=" + allExceptions) + .add("truncated=" + truncated) + .toString(); + } + @JsonIgnore public String getRootException() { return rootException; @@ -104,7 +133,14 @@ public boolean isTruncated() { // Static helper classes // --------------------------------------------------------------------------------- - /** Nested class to encapsulate the task execution exception. */ + /** + * Nested class to encapsulate the task execution exception. + * + * @deprecated {@code ExecutionExceptionInfo} will be replaced by {@link + * JobExceptionsInfoWithHistory.ExceptionInfo} as part of the effort of deprecating {@link + * JobExceptionsInfo#allExceptions}. + */ + @Deprecated public static final class ExecutionExceptionInfo { public static final String FIELD_NAME_EXCEPTION = "exception"; public static final String FIELD_NAME_TASK = "task"; @@ -155,5 +191,15 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(timestamp, exception, task, location); } + + @Override + public String toString() { + return new StringJoiner(", ", ExecutionExceptionInfo.class.getSimpleName() + "[", "]") + .add("exception='" + exception + "'") + .add("task='" + task + "'") + .add("location='" + location + "'") + .add("timestamp=" + timestamp) + .toString(); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java new file mode 100644 index 0000000000000..bd0b2aa374bb1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; + +import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code JobExceptionsInfoWithHistory} extends {@link JobExceptionsInfo} providing a history of + * previously caused failures. It's the response type of the {@link JobExceptionsHandler}. + */ +public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements ResponseBody { + + public static final String FIELD_NAME_EXCEPTION_HISTORY = "exceptionHistory"; + + @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) + private final JobExceptionHistory exceptionHistory; + + @JsonCreator + public JobExceptionsInfoWithHistory( + @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException, + @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp, + @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) List allExceptions, + @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated, + @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory exceptionHistory) { + super(rootException, rootTimestamp, allExceptions, truncated); + this.exceptionHistory = exceptionHistory; + } + + public JobExceptionsInfoWithHistory() { + this( + null, + null, + Collections.emptyList(), + false, + new JobExceptionHistory(Collections.emptyList(), false)); + } + + @JsonIgnore + public JobExceptionHistory getExceptionHistory() { + return exceptionHistory; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o; + return this.isTruncated() == that.isTruncated() + && Objects.equals(this.getRootException(), that.getRootException()) + && Objects.equals(this.getRootTimestamp(), that.getRootTimestamp()) + && Objects.equals(this.getAllExceptions(), that.getAllExceptions()) + && Objects.equals(exceptionHistory, that.exceptionHistory); + } + + @Override + public int hashCode() { + return Objects.hash( + isTruncated(), + getRootException(), + getRootTimestamp(), + getAllExceptions(), + exceptionHistory); + } + + @Override + public String toString() { + return new StringJoiner(", ", JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]") + .add("rootException='" + getRootException() + "'") + .add("rootTimestamp=" + getRootTimestamp()) + .add("allExceptions=" + getAllExceptions()) + .add("truncated=" + isTruncated()) + .add("exceptionHistory=" + exceptionHistory) + .toString(); + } + + /** {@code JobExceptionHistory} collects all previously caught errors. */ + public static final class JobExceptionHistory { + + public static final String FIELD_NAME_ENTRIES = "entries"; + public static final String FIELD_NAME_TRUNCATED = "truncated"; + + @JsonProperty(FIELD_NAME_ENTRIES) + private final List entries; + + @JsonProperty(FIELD_NAME_TRUNCATED) + private final boolean truncated; + + @JsonCreator + public JobExceptionHistory( + @JsonProperty(FIELD_NAME_ENTRIES) List entries, + @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) { + this.entries = entries; + this.truncated = truncated; + } + + @JsonIgnore + public List getEntries() { + return entries; + } + + @JsonIgnore + public boolean isTruncated() { + return truncated; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobExceptionHistory that = (JobExceptionHistory) o; + return this.isTruncated() == that.isTruncated() + && Objects.equals(entries, that.entries); + } + + @Override + public int hashCode() { + return Objects.hash(entries, truncated); + } + + @Override + public String toString() { + return new StringJoiner(", ", JobExceptionHistory.class.getSimpleName() + "[", "]") + .add("entries=" + entries) + .add("truncated=" + truncated) + .toString(); + } + } + + /** Collects the information of a single exception. */ + public static class ExceptionInfo { + + public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName"; + public static final String FIELD_NAME_EXCEPTION_STACKTRACE = "stacktrace"; + public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = "timestamp"; + public static final String FIELD_NAME_TASK_NAME = "taskName"; + public static final String FIELD_NAME_LOCATION = "location"; + + @JsonProperty(FIELD_NAME_EXCEPTION_NAME) + private final String exceptionName; + + @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) + private final String stacktrace; + + @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) + private final long timestamp; + + @JsonInclude(NON_NULL) + @JsonProperty(FIELD_NAME_TASK_NAME) + @Nullable + private final String taskName; + + @JsonInclude(NON_NULL) + @JsonProperty(FIELD_NAME_LOCATION) + @Nullable + private final String location; + + public ExceptionInfo(String exceptionName, String stacktrace, long timestamp) { + this(exceptionName, stacktrace, timestamp, null, null); + } + + @JsonCreator + public ExceptionInfo( + @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName, + @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace, + @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp, + @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName, + @JsonProperty(FIELD_NAME_LOCATION) @Nullable String location) { + this.exceptionName = checkNotNull(exceptionName); + this.stacktrace = checkNotNull(stacktrace); + this.timestamp = timestamp; + this.taskName = taskName; + this.location = location; + } + + @JsonIgnore + public String getExceptionName() { + return exceptionName; + } + + @JsonIgnore + public String getStacktrace() { + return stacktrace; + } + + @JsonIgnore + public long getTimestamp() { + return timestamp; + } + + @JsonIgnore + @Nullable + public String getTaskName() { + return taskName; + } + + @JsonIgnore + @Nullable + public String getLocation() { + return location; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExceptionInfo that = (ExceptionInfo) o; + return exceptionName.equals(that.exceptionName) + && stacktrace.equals(that.stacktrace) + && Objects.equals(timestamp, that.timestamp) + && Objects.equals(taskName, that.taskName) + && Objects.equals(location, that.location); + } + + @Override + public int hashCode() { + return Objects.hash(exceptionName, stacktrace, timestamp, taskName, location); + } + + @Override + public String toString() { + return new StringJoiner(", ", ExceptionInfo.class.getSimpleName() + "[", "]") + .add("exceptionName='" + exceptionName + "'") + .add("stacktrace='" + stacktrace + "'") + .add("timestamp=" + timestamp) + .add("taskName='" + taskName + "'") + .add("location='" + location + "'") + .toString(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java new file mode 100644 index 0000000000000..83f615417407b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.StringJoiner; + +/** + * {@code ExceptionHistoryEntry} collects information about a single failure that triggered the + * scheduler's failure handling. + */ +public class ExceptionHistoryEntry extends ErrorInfo { + + private static final long serialVersionUID = -3855285510064263701L; + + @Nullable private final String failingTaskName; + @Nullable private final ArchivedTaskManagerLocation taskManagerLocation; + + /** + * Creates a {@code ExceptionHistoryEntry} representing a global failure from the passed {@code + * Throwable} and timestamp. + * + * @param cause The reason for the failure. + * @param timestamp The time the failure was caught. + * @return The {@code ExceptionHistoryEntry} instance. + */ + public static ExceptionHistoryEntry fromGlobalFailure(Throwable cause, long timestamp) { + return new ExceptionHistoryEntry(cause, timestamp, null, null); + } + + /** + * Creates a {@code ExceptionHistoryEntry} representing a local failure using the passed + * information. + * + * @param execution The {@link AccessExecution} that caused the failure. + * @param failingTaskName The name of the task the {@code execution} is connected to. + * @return The {@code ExceptionHistoryEntry} instance. + */ + public static ExceptionHistoryEntry fromFailedExecution( + AccessExecution execution, String failingTaskName) { + ErrorInfo failureInfo = + execution + .getFailureInfo() + .orElseThrow( + () -> + new IllegalArgumentException( + "The passed Execution does not provide a failureCause.")); + return new ExceptionHistoryEntry( + failureInfo.getException(), + failureInfo.getTimestamp(), + failingTaskName, + ArchivedTaskManagerLocation.fromTaskManagerLocation( + execution.getAssignedResourceLocation())); + } + + @VisibleForTesting + public ExceptionHistoryEntry( + Throwable cause, + long timestamp, + @Nullable String failingTaskName, + @Nullable ArchivedTaskManagerLocation taskManagerLocation) { + super(cause, timestamp); + this.failingTaskName = failingTaskName; + this.taskManagerLocation = taskManagerLocation; + } + + public boolean isGlobal() { + return failingTaskName == null; + } + + @Nullable + public String getFailingTaskName() { + return failingTaskName; + } + + @Nullable + public ArchivedTaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + + /** + * {@code ArchivedTaskManagerLocation} represents a archived (static) version of a {@link + * TaskManagerLocation}. It overcomes the issue with {@link TaskManagerLocation#inetAddress} + * being partially transient due to the cache becoming out-dated. + */ + public static class ArchivedTaskManagerLocation implements Serializable { + + private static final long serialVersionUID = -6596854145482446664L; + + private final ResourceID resourceID; + private final String addressStr; + private final int port; + private final String hostname; + private final String fqdnHostname; + + /** + * Creates a {@code ArchivedTaskManagerLocation} copy of the passed {@link + * TaskManagerLocation}. + * + * @param taskManagerLocation The {@code TaskManagerLocation} that's going to be copied. + * @return The corresponding {@code ArchivedTaskManagerLocation} or {@code null} if {@code + * null} was passed. + */ + @VisibleForTesting + @Nullable + public static ArchivedTaskManagerLocation fromTaskManagerLocation( + TaskManagerLocation taskManagerLocation) { + if (taskManagerLocation == null) { + return null; + } + + return new ArchivedTaskManagerLocation( + taskManagerLocation.getResourceID(), + taskManagerLocation.addressString(), + taskManagerLocation.dataPort(), + taskManagerLocation.getHostname(), + taskManagerLocation.getFQDNHostname()); + } + + private ArchivedTaskManagerLocation( + ResourceID resourceID, + String addressStr, + int port, + String hostname, + String fqdnHost) { + this.resourceID = resourceID; + this.addressStr = addressStr; + this.port = port; + this.hostname = hostname; + this.fqdnHostname = fqdnHost; + } + + public ResourceID getResourceID() { + return resourceID; + } + + public String getAddress() { + return addressStr; + } + + public int getPort() { + return port; + } + + public String getHostname() { + return hostname; + } + + public String getFQDNHostname() { + return fqdnHostname; + } + + @Override + public String toString() { + return new StringJoiner( + ", ", ArchivedTaskManagerLocation.class.getSimpleName() + "[", "]") + .add("resourceID=" + resourceID) + .add("addressStr='" + addressStr + "'") + .add("port=" + port) + .add("hostname='" + hostname + "'") + .add("fqdnHostname='" + fqdnHostname + "'") + .toString(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java index 4b7b746e5b41f..d43a35549bcee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java @@ -20,12 +20,10 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import java.io.Serializable; import java.util.Collections; -import java.util.List; /** * {@code ExecutionGraphInfo} serves as a composite class that provides different {@link @@ -36,14 +34,15 @@ public class ExecutionGraphInfo implements Serializable { private static final long serialVersionUID = -6134203195124124202L; private final ArchivedExecutionGraph executionGraph; - private final List exceptionHistory; + private final Iterable exceptionHistory; public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) { this(executionGraph, Collections.emptyList()); } public ExecutionGraphInfo( - ArchivedExecutionGraph executionGraph, List exceptionHistory) { + ArchivedExecutionGraph executionGraph, + Iterable exceptionHistory) { this.executionGraph = executionGraph; this.exceptionHistory = exceptionHistory; } @@ -56,7 +55,7 @@ public ArchivedExecutionGraph getArchivedExecutionGraph() { return executionGraph; } - public List getExceptionHistory() { + public Iterable getExceptionHistory() { return exceptionHistory; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 9aeabd46f2675..c08ea6a246517 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; @@ -43,7 +44,6 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore; -import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -79,6 +79,7 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.util.BoundedFIFOQueue; import org.apache.flink.runtime.util.IntArrayList; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -141,7 +142,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling private final ComponentMainThreadExecutor mainThreadExecutor; - private final List taskFailureHistory = new ArrayList<>(); + private final BoundedFIFOQueue exceptionHistory; private final ExecutionGraphFactory executionGraphFactory; @@ -206,6 +207,10 @@ public SchedulerBase( this.operatorCoordinatorHandler = new OperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure); operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor); + exceptionHistory = + new BoundedFIFOQueue<>( + jobMasterConfiguration.getInteger( + JobManagerOptions.MAX_EXCEPTION_HISTORY_SIZE)); } private void registerShutDownCheckpointServicesOnExecutionGraphTermination( @@ -535,7 +540,7 @@ protected final void archiveGlobalFailure(@Nullable Throwable failure) { } protected final void archiveGlobalFailure(@Nullable Throwable failure, long timestamp) { - taskFailureHistory.add(ErrorInfo.createErrorInfoWithNullableCause(failure, timestamp)); + exceptionHistory.add(ExceptionHistoryEntry.fromGlobalFailure(failure, timestamp)); log.debug("Archive global failure.", failure); } @@ -549,16 +554,15 @@ protected final void archiveFromFailureHandlingResult( if (executionOptional.isPresent()) { final Execution failedExecution = executionOptional.get(); - failedExecution - .getFailureInfo() - .ifPresent( - failureInfo -> { - taskFailureHistory.add(failureInfo); - log.debug( - "Archive local failure causing attempt {} to fail: {}", - failedExecution.getAttemptId(), - failureInfo.getExceptionAsString()); - }); + final ExceptionHistoryEntry exceptionHistoryEntry = + ExceptionHistoryEntry.fromFailedExecution( + failedExecution, + failedExecution.getVertex().getTaskNameWithSubtaskIndex()); + exceptionHistory.add(exceptionHistoryEntry); + log.debug( + "Archive local failure causing attempt {} to fail: {}", + failedExecution.getAttemptId(), + exceptionHistoryEntry.getExceptionAsString()); } else { // fallback in case of a global fail over - no failed state is set and, therefore, no // timestamp was taken @@ -646,9 +650,17 @@ public final void notifyPartitionDataAvailable(final ResultPartitionID partition protected void notifyPartitionDataAvailableInternal( IntermediateResultPartitionID resultPartitionId) {} + /** + * Returns a copy of the current history of task failures. + * + * @return a copy of the current history of task failures. + */ @VisibleForTesting - protected List getExceptionHistory() { - return taskFailureHistory; + protected Iterable getExceptionHistory() { + final Collection copy = new ArrayList<>(exceptionHistory.size()); + exceptionHistory.forEach(copy::add); + + return copy; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java new file mode 100644 index 0000000000000..aa8f81a914acd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BoundedFIFOQueue.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; + +/** + * {@code BoundedFIFOQueue} collects elements up to given amount. Reaching this limit will result in + * removing the oldest element from this queue (First-In/First-Out; FIFO). + * + * @param The type of elements collected. + */ +public class BoundedFIFOQueue implements Iterable, Serializable { + + private static final long serialVersionUID = -890727339944580409L; + + private final int maxSize; + private final Queue elements; + + /** + * Creates a {@code BoundedFIFOQueue} with the given maximum size. + * + * @param maxSize The maximum size of this queue. Exceeding this limit would result in removing + * the oldest element (FIFO). + * @throws IllegalArgumentException If {@code maxSize} is less than 0. + */ + public BoundedFIFOQueue(int maxSize) { + Preconditions.checkArgument(maxSize >= 0, "The maximum size should be at least 0."); + + this.maxSize = maxSize; + this.elements = new LinkedList<>(); + } + + /** + * Adds an element to the end of the queue. An element will be removed from the head of the + * queue if the queue would exceed its maximum size by adding the new element. + * + * @param element The element that should be added to the end of the queue. + * @throws NullPointerException If {@code null} is passed as an element. + */ + public void add(T element) { + Preconditions.checkNotNull(element); + if (elements.add(element) && elements.size() > maxSize) { + elements.poll(); + } + } + + /** + * Returns the number of currently stored elements. + * + * @return The number of currently stored elements. + */ + public int size() { + return this.elements.size(); + } + + /** + * Returns the {@code BoundedFIFOQueue}'s {@link Iterator}. + * + * @return The queue's {@code Iterator}. + */ + @Override + public Iterator iterator() { + return elements.iterator(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index bdbd8ba183a11..dd1e5ab3f30bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecution; import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; @@ -37,68 +36,245 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsInfo; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.ExceptionInfo; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter; +import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.EvictingBoundedList; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.collection.IsEmptyCollection.empty; +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** Test for the {@link JobExceptionsHandler}. */ public class JobExceptionsHandlerTest extends TestLogger { + private final JobExceptionsHandler testInstance = + new JobExceptionsHandler( + CompletableFuture::new, + TestingUtils.TIMEOUT(), + Collections.emptyMap(), + JobExceptionsHeaders.getInstance(), + new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(), TestingUtils.TIMEOUT()), + TestingUtils.defaultExecutor()); + + @Test + public void testNoExceptions() throws HandlerRequestException { + final ExecutionGraphInfo executionGraphInfo = + new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().build()); + + final HandlerRequest request = + createRequest(executionGraphInfo.getJobId(), 10); + final JobExceptionsInfoWithHistory response = + testInstance.handleRequest(request, executionGraphInfo); + + assertThat(response.getRootException(), is(nullValue())); + assertThat(response.getRootTimestamp(), is(nullValue())); + assertFalse(response.isTruncated()); + assertThat(response.getAllExceptions(), empty()); + assertThat(response.getExceptionHistory().getEntries(), empty()); + } + + @Test + public void testOnlyRootCause() throws HandlerRequestException { + final Throwable rootCause = new RuntimeException("root cause"); + final long rootCauseTimestamp = System.currentTimeMillis(); + + final ExecutionGraphInfo executionGraphInfo = + createExecutionGraphInfo( + ExceptionHistoryEntry.fromGlobalFailure(rootCause, rootCauseTimestamp)); + final HandlerRequest request = + createRequest(executionGraphInfo.getJobId(), 10); + final JobExceptionsInfoWithHistory response = + testInstance.handleRequest(request, executionGraphInfo); + + assertThat(response.getRootException(), is(ExceptionUtils.stringifyException(rootCause))); + assertThat(response.getRootTimestamp(), is(rootCauseTimestamp)); + assertFalse(response.isTruncated()); + assertThat(response.getAllExceptions(), empty()); + + assertThat( + response.getExceptionHistory().getEntries(), + contains(historyContainsGlobalFailure(rootCause, rootCauseTimestamp))); + } + + @Test + public void testWithExceptionHistory() throws HandlerRequestException { + final ExceptionHistoryEntry rootCause = + ExceptionHistoryEntry.fromGlobalFailure( + new RuntimeException("exception #0"), System.currentTimeMillis()); + final ExceptionHistoryEntry otherFailure = + new ExceptionHistoryEntry( + new RuntimeException("exception #1"), + System.currentTimeMillis(), + "task name", + ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation( + new LocalTaskManagerLocation())); + + final ExecutionGraphInfo executionGraphInfo = + createExecutionGraphInfo(rootCause, otherFailure); + final HandlerRequest request = + createRequest(executionGraphInfo.getJobId(), 10); + final JobExceptionsInfoWithHistory response = + testInstance.handleRequest(request, executionGraphInfo); + + assertThat( + response.getExceptionHistory().getEntries(), + contains( + historyContainsGlobalFailure( + rootCause.getException(), rootCause.getTimestamp()), + historyContainsJobExceptionInfo( + otherFailure.getException(), + otherFailure.getTimestamp(), + otherFailure.getFailingTaskName(), + JobExceptionsHandler.toString( + otherFailure.getTaskManagerLocation())))); + assertFalse(response.getExceptionHistory().isTruncated()); + } + + @Test + public void testWithExceptionHistoryWithTruncationThroughParameter() + throws HandlerRequestException { + final ExceptionHistoryEntry rootCause = + ExceptionHistoryEntry.fromGlobalFailure( + new RuntimeException("exception #0"), System.currentTimeMillis()); + final ExceptionHistoryEntry otherFailure = + new ExceptionHistoryEntry( + new RuntimeException("exception #1"), + System.currentTimeMillis(), + "task name", + ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation( + new LocalTaskManagerLocation())); + + final ExecutionGraphInfo executionGraphInfo = + createExecutionGraphInfo(rootCause, otherFailure); + final HandlerRequest request = + createRequest(executionGraphInfo.getJobId(), 1); + final JobExceptionsInfoWithHistory response = + testInstance.handleRequest(request, executionGraphInfo); + + assertThat( + response.getExceptionHistory().getEntries(), + contains( + historyContainsGlobalFailure( + rootCause.getException(), rootCause.getTimestamp()))); + assertThat(response.getExceptionHistory().getEntries(), iterableWithSize(1)); + assertTrue(response.getExceptionHistory().isTruncated()); + } + + @Test + public void testTaskManagerLocationFallbackHandling() { + assertThat(JobExceptionsHandler.toString((TaskManagerLocation) null), is("(unassigned)")); + } + + @Test + public void testTaskManagerLocationHandling() { + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + assertThat( + JobExceptionsHandler.toString(taskManagerLocation), + is( + String.format( + "%s:%s", + taskManagerLocation.getFQDNHostname(), + taskManagerLocation.dataPort()))); + } + + @Test + public void testArchivedTaskManagerLocationFallbackHandling() { + assertThat( + JobExceptionsHandler.toString( + (ExceptionHistoryEntry.ArchivedTaskManagerLocation) null), + is(nullValue())); + } + + @Test + public void testArchivedTaskManagerLocationHandling() { + final ExceptionHistoryEntry.ArchivedTaskManagerLocation taskManagerLocation = + ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation( + new LocalTaskManagerLocation()); + assertThat( + JobExceptionsHandler.toString(taskManagerLocation), + is( + String.format( + "%s:%s", + taskManagerLocation.getFQDNHostname(), + taskManagerLocation.getPort()))); + } + @Test public void testGetJobExceptionsInfo() throws HandlerRequestException { - final JobExceptionsHandler jobExceptionsHandler = - new JobExceptionsHandler( - () -> null, - TestingUtils.TIMEOUT(), - Collections.emptyMap(), - JobExceptionsHeaders.getInstance(), - new DefaultExecutionGraphCache( - TestingUtils.TIMEOUT(), TestingUtils.TIMEOUT()), - TestingUtils.defaultExecutor()); final int numExceptions = 20; - final AccessExecutionGraph archivedExecutionGraph = - createAccessExecutionGraph(numExceptions); - checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph, numExceptions, 10); - checkExceptionLimit( - jobExceptionsHandler, archivedExecutionGraph, numExceptions, numExceptions); - checkExceptionLimit(jobExceptionsHandler, archivedExecutionGraph, numExceptions, 30); + final ExecutionGraphInfo archivedExecutionGraph = createAccessExecutionGraph(numExceptions); + checkExceptionLimit(testInstance, archivedExecutionGraph, numExceptions, 10); + checkExceptionLimit(testInstance, archivedExecutionGraph, numExceptions, numExceptions); + checkExceptionLimit(testInstance, archivedExecutionGraph, numExceptions, 30); } private static void checkExceptionLimit( JobExceptionsHandler jobExceptionsHandler, - AccessExecutionGraph graph, + ExecutionGraphInfo graph, int maxNumExceptions, int numExpectedException) throws HandlerRequestException { final HandlerRequest handlerRequest = - createRequest(graph.getJobID(), numExpectedException); + createRequest(graph.getJobId(), numExpectedException); final JobExceptionsInfo jobExceptionsInfo = jobExceptionsHandler.handleRequest(handlerRequest, graph); - final int numReportedException = - maxNumExceptions >= numExpectedException ? numExpectedException : maxNumExceptions; + final int numReportedException = Math.min(maxNumExceptions, numExpectedException); assertEquals(jobExceptionsInfo.getAllExceptions().size(), numReportedException); } - private static AccessExecutionGraph createAccessExecutionGraph(int numTasks) { + private static ExecutionGraphInfo createAccessExecutionGraph(int numTasks) { Map tasks = new HashMap<>(); for (int i = 0; i < numTasks; i++) { final JobVertexID jobVertexId = new JobVertexID(); tasks.put(jobVertexId, createArchivedExecutionJobVertex(jobVertexId)); } - return new ArchivedExecutionGraphBuilder().setTasks(tasks).build(); + + final Throwable failureCause = new RuntimeException("root cause"); + final long failureTimestamp = System.currentTimeMillis(); + final List exceptionHistory = + Collections.singletonList( + new ExceptionHistoryEntry( + failureCause, + failureTimestamp, + "test task #1", + ExceptionHistoryEntry.ArchivedTaskManagerLocation + .fromTaskManagerLocation(new LocalTaskManagerLocation()))); + return new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setFailureCause(new ErrorInfo(failureCause, failureTimestamp)) + .setTasks(tasks) + .build(), + exceptionHistory); } private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex( @@ -141,6 +317,32 @@ private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex( emptyAccumulators); } + // -------- exception history related utility methods for creating the input data -------- + + private static ExecutionGraphInfo createExecutionGraphInfo( + ExceptionHistoryEntry... historyEntries) { + final ArchivedExecutionGraphBuilder executionGraphBuilder = + new ArchivedExecutionGraphBuilder(); + final List historyEntryCollection = new ArrayList<>(); + + for (int i = 0; i < historyEntries.length; i++) { + if (i == 0) { + // first entry is root cause + executionGraphBuilder.setFailureCause( + new ErrorInfo( + historyEntries[i].getException(), + historyEntries[i].getTimestamp())); + } + + historyEntryCollection.add(historyEntries[i]); + } + + // we have to reverse it to simulate how the Scheduler collects it + Collections.reverse(historyEntryCollection); + + return new ExecutionGraphInfo(executionGraphBuilder.build(), historyEntryCollection); + } + private static HandlerRequest createRequest( JobID jobId, int size) throws HandlerRequestException { final Map pathParameters = new HashMap<>(); @@ -154,4 +356,138 @@ private static HandlerRequest pathParameters, queryParameters); } + + // -------- factory methods for instantiating new Matchers -------- + + private static Matcher historyContainsJobExceptionInfo( + Throwable expectedFailureCause, + long expectedFailureTimestamp, + String expectedTaskNameWithSubtaskId, + String expectedTaskManagerLocation) { + return new ExceptionInfoMatcher( + expectedFailureCause, + expectedFailureTimestamp, + expectedTaskNameWithSubtaskId, + expectedTaskManagerLocation); + } + + private static Matcher historyContainsGlobalFailure( + Throwable expectedFailureCause, long expectedFailureTimestamp) { + return historyContainsJobExceptionInfo( + expectedFailureCause, expectedFailureTimestamp, null, null); + } + + // -------- Matcher implementations used in this test class -------- + + /** Checks the given {@link ExceptionInfo} instance. */ + private static class ExceptionInfoMatcher extends TypeSafeDiagnosingMatcher { + + private final Throwable expectedException; + private final long expectedTimestamp; + private final String expectedTaskName; + private final String expectedLocation; + + public ExceptionInfoMatcher( + Throwable expectedException, + long expectedTimestamp, + String expectedTaskName, + String expectedLocation) { + this.expectedException = deserializeSerializedThrowable(expectedException); + this.expectedTimestamp = expectedTimestamp; + this.expectedTaskName = expectedTaskName; + this.expectedLocation = expectedLocation; + } + + @Override + public void describeTo(Description description) { + description + .appendText("exceptionName=") + .appendText(getExpectedExceptionName()) + .appendText(", exceptionStacktrace=") + .appendText(getExpectedStacktrace()) + .appendText(", timestamp=") + .appendText(String.valueOf(expectedTimestamp)) + .appendText(", taskName=") + .appendText(expectedTaskName) + .appendText(", location=") + .appendText(expectedLocation); + } + + private String getExpectedExceptionName() { + return expectedException.getClass().getName(); + } + + private String getExpectedStacktrace() { + return ExceptionUtils.stringifyException(expectedException); + } + + @Override + protected boolean matchesSafely(ExceptionInfo info, Description description) { + return matches( + info, + description, + ExceptionInfo::getExceptionName, + getExpectedExceptionName(), + "exceptionName") + && matches( + info, + description, + ExceptionInfo::getStacktrace, + ExceptionUtils.stringifyException(expectedException), + "stacktrace") + && matches( + info, + description, + ExceptionInfo::getTimestamp, + expectedTimestamp, + "timestamp") + && matches( + info, + description, + ExceptionInfo::getTaskName, + expectedTaskName, + "taskName") + && matches( + info, + description, + ExceptionInfo::getLocation, + expectedLocation, + "location"); + } + + private boolean matches( + ExceptionInfo info, + Description desc, + Function extractor, + R expectedValue, + String attributeName) { + final R actualValue = extractor.apply(info); + if (actualValue == null) { + return expectedValue == null; + } + + final boolean match = actualValue.equals(expectedValue); + if (!match) { + desc.appendText(attributeName) + .appendText("=") + .appendText(String.valueOf(actualValue)); + } + + return match; + } + + /** + * Utility method for unwrapping a {@link SerializedThrowable} again. + * + * @param throwable The {@code Throwable} that might be unwrapped. + * @return The unwrapped {@code Throwable} if the {@code throwable} was actually a {@code + * SerializedThrowable}; otherwise the {@code throwable} itself. + */ + protected Throwable deserializeSerializedThrowable(Throwable throwable) { + return throwable instanceof SerializedThrowable + ? ((SerializedThrowable) throwable) + .deserializeError(ClassLoader.getSystemClassLoader()) + : throwable; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java deleted file mode 100644 index d9b062768c0d1..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -import java.util.ArrayList; -import java.util.List; - -/** Tests that the {@link JobExceptionsInfo} can be marshalled and unmarshalled. */ -public class JobExceptionsInfoTest extends RestResponseMarshallingTestBase { - @Override - protected Class getTestResponseClass() { - return JobExceptionsInfo.class; - } - - @Override - protected JobExceptionsInfo getTestResponseInstance() throws Exception { - List executionTaskExceptionInfoList = - new ArrayList<>(); - executionTaskExceptionInfoList.add( - new JobExceptionsInfo.ExecutionExceptionInfo( - "exception1", "task1", "location1", System.currentTimeMillis())); - executionTaskExceptionInfoList.add( - new JobExceptionsInfo.ExecutionExceptionInfo( - "exception2", "task2", "location2", System.currentTimeMillis())); - return new JobExceptionsInfo( - "root exception", - System.currentTimeMillis(), - executionTaskExceptionInfoList, - false); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java similarity index 55% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java index d2b9bfd5fee4e..3adf9425f6521 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java @@ -19,21 +19,22 @@ package org.apache.flink.runtime.rest.messages; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** - * Tests that the {@link JobExceptionsInfo} with no root exception can be marshalled and + * Tests that the {@link JobExceptionsInfoWithHistory} with no root exception can be marshalled and * unmarshalled. */ -public class JobExceptionsInfoNoRootTest - extends RestResponseMarshallingTestBase { +public class JobExceptionsInfoWithHistoryNoRootTest + extends RestResponseMarshallingTestBase { @Override - protected Class getTestResponseClass() { - return JobExceptionsInfo.class; + protected Class getTestResponseClass() { + return JobExceptionsInfoWithHistory.class; } @Override - protected JobExceptionsInfo getTestResponseInstance() throws Exception { + protected JobExceptionsInfoWithHistory getTestResponseInstance() throws Exception { List executionTaskExceptionInfoList = new ArrayList<>(); executionTaskExceptionInfoList.add( @@ -42,6 +43,21 @@ protected JobExceptionsInfo getTestResponseInstance() throws Exception { executionTaskExceptionInfoList.add( new JobExceptionsInfo.ExecutionExceptionInfo( "exception2", "task2", "location2", System.currentTimeMillis())); - return new JobExceptionsInfo(null, null, executionTaskExceptionInfoList, false); + return new JobExceptionsInfoWithHistory( + null, + null, + executionTaskExceptionInfoList, + false, + new JobExceptionsInfoWithHistory.JobExceptionHistory( + Arrays.asList( + new JobExceptionsInfoWithHistory.ExceptionInfo( + "global failure #0", "stacktrace #0", 0L), + new JobExceptionsInfoWithHistory.ExceptionInfo( + "local task failure #1", + "stacktrace #1", + 1L, + "task name", + "location")), + false)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java new file mode 100644 index 0000000000000..b10bff2e0e9b2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.util.RestMapperUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.hamcrest.CoreMatchers; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; + +/** Tests that the {@link JobExceptionsInfoWithHistory} can be marshalled and unmarshalled. */ +public class JobExceptionsInfoWithHistoryTest + extends RestResponseMarshallingTestBase { + @Override + protected Class getTestResponseClass() { + return JobExceptionsInfoWithHistory.class; + } + + @Override + protected JobExceptionsInfoWithHistory getTestResponseInstance() throws Exception { + List executionTaskExceptionInfoList = + new ArrayList<>(); + executionTaskExceptionInfoList.add( + new JobExceptionsInfo.ExecutionExceptionInfo( + "exception1", "task1", "location1", System.currentTimeMillis())); + executionTaskExceptionInfoList.add( + new JobExceptionsInfo.ExecutionExceptionInfo( + "exception2", "task2", "location2", System.currentTimeMillis())); + return new JobExceptionsInfoWithHistory( + "root exception", + System.currentTimeMillis(), + executionTaskExceptionInfoList, + false, + new JobExceptionsInfoWithHistory.JobExceptionHistory( + Collections.emptyList(), false)); + } + + /** + * {@code taskName} and {@code location} should not be exposed if not set. + * + * @throws JsonProcessingException is not expected to be thrown + */ + @Test + public void testNullFieldsNotSet() throws JsonProcessingException { + ObjectMapper objMapper = RestMapperUtils.getStrictObjectMapper(); + String json = + objMapper.writeValueAsString( + new JobExceptionsInfoWithHistory.ExceptionInfo( + "exception name", "stacktrace", 0L)); + + assertThat(json, not(CoreMatchers.containsString("taskName"))); + assertThat(json, not(CoreMatchers.containsString("location"))); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index cd2c06fdf8506..e1015e2d51ec0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; @@ -48,13 +49,16 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.ExecutorUtils; @@ -65,6 +69,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; import org.apache.flink.shaded.guava18.com.google.common.collect.Range; +import org.hamcrest.collection.IsIterableWithSize; import org.hamcrest.core.Is; import org.junit.After; import org.junit.Before; @@ -1052,36 +1057,51 @@ public void testExceptionHistoryWithGlobalFailOver() { taskRestartExecutor.triggerScheduledTasks(); final long end = System.currentTimeMillis(); - final List actualExceptionHistory = scheduler.getExceptionHistory(); + final Iterable actualExceptionHistory = + scheduler.getExceptionHistory(); - assertThat(actualExceptionHistory, hasSize(1)); + assertThat(actualExceptionHistory, IsIterableWithSize.iterableWithSize(1)); - final ErrorInfo failure = actualExceptionHistory.get(0); + final ExceptionHistoryEntry failure = actualExceptionHistory.iterator().next(); assertThat( failure.getException().deserializeError(ClassLoader.getSystemClassLoader()), is(expectedException)); assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start)); assertThat(failure.getTimestamp(), lessThanOrEqualTo(end)); + assertThat(failure.getTaskManagerLocation(), is(nullValue())); + assertThat(failure.getFailingTaskName(), is(nullValue())); } @Test public void testExceptionHistoryWithRestartableFailure() { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final TestingLogicalSlotBuilder logicalSlotBuilder = new TestingLogicalSlotBuilder(); + logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation); + final ExceptionHistoryEntry.ArchivedTaskManagerLocation + expectedArchivedTaskManagerLocation = + ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation( + taskManagerLocation); + + executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(logicalSlotBuilder); + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); // initiate restartable failure - final ExecutionAttemptID restartableAttemptId = + final ArchivedExecutionVertex taskFailureExecutionVertex = Iterables.getOnlyElement( - scheduler - .requestJob() - .getArchivedExecutionGraph() - .getAllExecutionVertices()) - .getCurrentExecutionAttempt() - .getAttemptId(); + scheduler + .requestJob() + .getArchivedExecutionGraph() + .getAllExecutionVertices()); + final String expectedTaskName = taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(); final RuntimeException restartableException = new RuntimeException("restartable exception"); Range updateStateTriggeringRestartTimeframe = - initiateFailure(scheduler, restartableAttemptId, restartableException); + initiateFailure( + scheduler, + taskFailureExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), + restartableException); taskRestartExecutor.triggerNonPeriodicScheduledTask(); @@ -1097,14 +1117,17 @@ public void testExceptionHistoryWithRestartableFailure() { .getCurrentExecutionAttempt() .getAttemptId(); final RuntimeException failingException = new RuntimeException("failing exception"); - Range updateStateTriggeringJobFailureTimeframe = + final Range updateStateTriggeringJobFailureTimeframe = initiateFailure(scheduler, failingAttemptId, failingException); - List actualExceptionHistory = scheduler.getExceptionHistory(); - assertThat(actualExceptionHistory.size(), is(2)); + final Iterable actualExceptionHistory = + scheduler.getExceptionHistory(); + assertThat(actualExceptionHistory, IsIterableWithSize.iterableWithSize(2)); + final Iterator exceptionHistoryIterator = + actualExceptionHistory.iterator(); // assert restarted attempt - ErrorInfo restartableFailure = actualExceptionHistory.get(0); + final ExceptionHistoryEntry restartableFailure = exceptionHistoryIterator.next(); assertThat( restartableFailure .getException() @@ -1116,10 +1139,15 @@ public void testExceptionHistoryWithRestartableFailure() { assertThat( restartableFailure.getTimestamp(), lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); + assertThat(restartableFailure.getFailingTaskName(), is(expectedTaskName)); + assertThat( + restartableFailure.getTaskManagerLocation(), + ExceptionHistoryEntryTest.isArchivedTaskManagerLocation( + expectedArchivedTaskManagerLocation)); // assert job failure attempt - ErrorInfo globalFailure = actualExceptionHistory.get(1); - Throwable actualException = + final ExceptionHistoryEntry globalFailure = exceptionHistoryIterator.next(); + final Throwable actualException = globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); assertThat(actualException, instanceOf(JobException.class)); assertThat(actualException, FlinkMatchers.containsCause(failingException)); @@ -1129,6 +1157,53 @@ public void testExceptionHistoryWithRestartableFailure() { assertThat( globalFailure.getTimestamp(), lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); + assertThat(globalFailure.getFailingTaskName(), is(nullValue())); + assertThat(globalFailure.getTaskManagerLocation(), is(nullValue())); + } + + @Test + public void testExceptionHistoryTruncation() { + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + + configuration.set(JobManagerOptions.MAX_EXCEPTION_HISTORY_SIZE, 1); + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID attemptId0 = + Iterables.getOnlyElement( + scheduler + .requestJob() + .getArchivedExecutionGraph() + .getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + attemptId0, ExecutionState.FAILED, new RuntimeException("old exception"))); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + + final ExecutionAttemptID attemptId1 = + Iterables.getOnlyElement( + scheduler + .requestJob() + .getArchivedExecutionGraph() + .getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + final RuntimeException exception = new RuntimeException("relevant exception"); + scheduler.updateTaskExecutionState( + new TaskExecutionState(attemptId1, ExecutionState.FAILED, exception)); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + + final Iterator entryIterator = + scheduler.getExceptionHistory().iterator(); + assertTrue(entryIterator.hasNext()); + assertThat( + entryIterator + .next() + .getException() + .deserializeError(ClassLoader.getSystemClassLoader()), + is(exception)); + assertFalse(entryIterator.hasNext()); } private static TaskExecutionState createFailedTaskExecutionState( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java new file mode 100644 index 0000000000000..141c5c9970ab4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.runtime.scheduler.ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +/** {@code ExceptionHistoryEntryTest} tests the instantiation of {@link ExceptionHistoryEntry}. */ +public class ExceptionHistoryEntryTest extends TestLogger { + + @Test + public void testFromGlobalFailure() { + final Throwable failureCause = new RuntimeException("failure cause"); + final long timestamp = System.currentTimeMillis(); + + final ExceptionHistoryEntry testInstance = + ExceptionHistoryEntry.fromGlobalFailure(failureCause, timestamp); + + assertThat( + testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()), + is(failureCause)); + assertThat(testInstance.getTimestamp(), is(timestamp)); + assertThat(testInstance.getFailingTaskName(), is(nullValue())); + assertThat(testInstance.getTaskManagerLocation(), is(nullValue())); + } + + @Test + public void testFromFailedExecution() { + final Throwable failureCause = new RuntimeException("Expected failure"); + final long failureTimestamp = System.currentTimeMillis(); + final String taskNameWithSubTaskIndex = "task name"; + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + + final AccessExecution failedExecution = + new TestingExecution( + new ErrorInfo(failureCause, failureTimestamp), taskManagerLocation); + final ExceptionHistoryEntry testInstance = + ExceptionHistoryEntry.fromFailedExecution( + failedExecution, taskNameWithSubTaskIndex); + + assertThat( + testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()), + is(failureCause)); + assertThat(testInstance.getTimestamp(), is(failureTimestamp)); + assertThat(testInstance.getFailingTaskName(), is(taskNameWithSubTaskIndex)); + assertThat( + testInstance.getTaskManagerLocation(), + isArchivedTaskManagerLocation(fromTaskManagerLocation(taskManagerLocation))); + } + + @Test(expected = IllegalArgumentException.class) + public void testFromFailedExecutionWithoutFailure() { + final AccessExecution executionWithoutFailure = + new TestingExecution(null, new LocalTaskManagerLocation()); + ExceptionHistoryEntry.fromFailedExecution(executionWithoutFailure, "task name"); + } + + /** + * {@code TestingExecution} mocks {@link AccessExecution} to provide the relevant methods for + * testing {@link ExceptionHistoryEntry#fromFailedExecution(AccessExecution, String)}. + */ + private static class TestingExecution implements AccessExecution { + + private final ErrorInfo failureInfo; + private final TaskManagerLocation taskManagerLocation; + + private TestingExecution( + @Nullable ErrorInfo failureInfo, + @Nullable TaskManagerLocation taskManagerLocation) { + this.failureInfo = failureInfo; + this.taskManagerLocation = taskManagerLocation; + } + + @Override + public Optional getFailureInfo() { + return Optional.ofNullable(failureInfo); + } + + @Override + public TaskManagerLocation getAssignedResourceLocation() { + return taskManagerLocation; + } + + @Override + public long getStateTimestamp(ExecutionState state) { + throw new UnsupportedOperationException("Method should not be triggered."); + } + + @Override + public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() { + throw new UnsupportedOperationException("Method should not be triggered."); + } + + @Override + public int getParallelSubtaskIndex() { + throw new UnsupportedOperationException("Method should not be triggered."); + } + + @Override + public IOMetrics getIOMetrics() { + throw new UnsupportedOperationException("Method should not be triggered."); + } + + @Override + public ExecutionAttemptID getAttemptId() { + throw new UnsupportedOperationException("Method should not be triggered."); + } + + @Override + public int getAttemptNumber() { + throw new UnsupportedOperationException("Method should not be triggered."); + } + + @Override + public long[] getStateTimestamps() { + throw new UnsupportedOperationException("Method should not be triggered."); + } + + @Override + public ExecutionState getState() { + throw new UnsupportedOperationException("Method should not be triggered."); + } + } + + public static Matcher + isArchivedTaskManagerLocation( + ExceptionHistoryEntry.ArchivedTaskManagerLocation actualLocation) { + return new ArchivedTaskManagerLocationMatcher(actualLocation); + } + + private static class ArchivedTaskManagerLocationMatcher + extends TypeSafeDiagnosingMatcher { + + private final ExceptionHistoryEntry.ArchivedTaskManagerLocation expectedLocation; + + public ArchivedTaskManagerLocationMatcher( + ExceptionHistoryEntry.ArchivedTaskManagerLocation expectedLocation) { + this.expectedLocation = expectedLocation; + } + + @Override + protected boolean matchesSafely( + ExceptionHistoryEntry.ArchivedTaskManagerLocation actual, Description description) { + if (actual == null) { + return expectedLocation == null; + } + + boolean match = true; + if (!Objects.equals(actual.getAddress(), expectedLocation.getAddress())) { + description.appendText(" address=").appendText(actual.getAddress()); + match = false; + } + + if (!Objects.equals(actual.getFQDNHostname(), expectedLocation.getFQDNHostname())) { + description.appendText(" FQDNHostname=").appendText(actual.getFQDNHostname()); + match = false; + } + + if (!Objects.equals(actual.getHostname(), expectedLocation.getHostname())) { + description.appendText(" hostname=").appendText(actual.getHostname()); + match = false; + } + + if (!Objects.equals(actual.getResourceID(), expectedLocation.getResourceID())) { + description + .appendText(" resourceID=") + .appendText(actual.getResourceID().toString()); + match = false; + } + + if (!Objects.equals(actual.getPort(), expectedLocation.getPort())) { + description.appendText(" port=").appendText(String.valueOf(actual.getPort())); + match = false; + } + + return match; + } + + @Override + public void describeTo(Description description) { + description.appendText(String.valueOf(expectedLocation)); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java index 5c938c39d33b8..59fea5250d654 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocator.java @@ -42,16 +42,22 @@ public class TestExecutionSlotAllocator implements ExecutionSlotAllocator, SlotO private final Map pendingRequests = new HashMap<>(); - private final TestingLogicalSlotBuilder logicalSlotBuilder = new TestingLogicalSlotBuilder(); + private final TestingLogicalSlotBuilder logicalSlotBuilder; private boolean autoCompletePendingRequests = true; private final List returnedSlots = new ArrayList<>(); - public TestExecutionSlotAllocator() {} + public TestExecutionSlotAllocator() { + this(new TestingLogicalSlotBuilder()); + } public TestExecutionSlotAllocator(TaskManagerGateway taskManagerGateway) { - logicalSlotBuilder.setTaskManagerGateway(taskManagerGateway); + this(new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway)); + } + + public TestExecutionSlotAllocator(TestingLogicalSlotBuilder logicalSlotBuilder) { + this.logicalSlotBuilder = logicalSlotBuilder; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java index 59f3ab27eb7eb..9d71af50977f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; /** Factory for {@link TestExecutionSlotAllocatorFactory}. */ public class TestExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory { @@ -34,6 +35,10 @@ public TestExecutionSlotAllocatorFactory(TaskManagerGateway gateway) { this.testExecutionSlotAllocator = new TestExecutionSlotAllocator(gateway); } + public TestExecutionSlotAllocatorFactory(TestingLogicalSlotBuilder logicalSlotBuilder) { + this.testExecutionSlotAllocator = new TestExecutionSlotAllocator(logicalSlotBuilder); + } + @Override public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) { return testExecutionSlotAllocator; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java new file mode 100644 index 0000000000000..818b7e1310bce --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BoundedFIFOQueueTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.hamcrest.collection.IsIterableWithSize.iterableWithSize; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** {@code BoundedFIFOQueueTest} tests {@link BoundedFIFOQueue}. */ +public class BoundedFIFOQueueTest extends TestLogger { + + @Test(expected = IllegalArgumentException.class) + public void testConstructorFailing() { + new BoundedFIFOQueue<>(-1); + } + + @Test + public void testQueueWithMaxSize0() { + final BoundedFIFOQueue testInstance = new BoundedFIFOQueue<>(0); + assertThat(testInstance, iterableWithSize(0)); + testInstance.add(1); + assertThat(testInstance, iterableWithSize(0)); + } + + @Test + public void testQueueWithMaxSize2() { + final BoundedFIFOQueue testInstance = new BoundedFIFOQueue<>(2); + assertThat(testInstance, iterableWithSize(0)); + + testInstance.add(1); + assertThat(testInstance, contains(1)); + + testInstance.add(2); + assertThat(testInstance, contains(1, 2)); + + testInstance.add(3); + assertThat(testInstance, contains(2, 3)); + } + + @Test + public void testAddNullHandling() { + final BoundedFIFOQueue testInstance = new BoundedFIFOQueue<>(1); + try { + testInstance.add(null); + fail("A NullPointerException is expected to be thrown."); + } catch (NullPointerException e) { + // NullPointerException is expected + } + + assertThat(testInstance, iterableWithSize(0)); + } + + /** + * Tests that {@link BoundedFIFOQueue#size()} returns the number of elements currently stored in + * the queue with a {@code maxSize} of 0. + */ + @Test + public void testSizeWithMaxSize0() { + final BoundedFIFOQueue testInstance = new BoundedFIFOQueue<>(0); + assertThat(testInstance.size(), is(0)); + + testInstance.add(1); + assertThat(testInstance.size(), is(0)); + } + + /** + * Tests that {@link BoundedFIFOQueue#size()} returns the number of elements currently stored in + * the queue with a {@code maxSize} of 2. + */ + @Test + public void testSizeWithMaxSize2() { + final BoundedFIFOQueue testInstance = new BoundedFIFOQueue<>(2); + assertThat(testInstance.size(), is(0)); + + testInstance.add(5); + assertThat(testInstance.size(), is(1)); + + testInstance.add(6); + assertThat(testInstance.size(), is(2)); + + // adding a 3rd element won't increase the size anymore + testInstance.add(7); + assertThat(testInstance.size(), is(2)); + } +}