diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java index f643bc4fb4da9..3bbc5cddd0f58 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TaskManagerExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,7 +147,7 @@ private static ByteBuffer allocateDirectMemory(int size) { // once we find a common way to handle OOM errors in netty threads. // Here we enrich it to propagate better OOM message to the receiver // if it happens in a netty thread. - Throwable enrichedOutOfMemoryError = ExceptionUtils.tryEnrichTaskManagerError(outOfMemoryError); + Throwable enrichedOutOfMemoryError = TaskManagerExceptionUtils.tryEnrichTaskManagerError(outOfMemoryError); if (ExceptionUtils.isDirectOutOfMemoryError(outOfMemoryError)) { LOG.error("Cannot allocate direct memory segment", enrichedOutOfMemoryError); } diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 8982b29c67bc5..980c95eaaca94 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -25,7 +25,6 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.util.function.RunnableWithException; import javax.annotation.Nullable; @@ -49,27 +48,6 @@ public final class ExceptionUtils { /** The stringified representation of a null exception reference. */ public static final String STRINGIFIED_NULL_EXCEPTION = "(null)"; - private static final String TM_DIRECT_OOM_ERROR_MESSAGE = String.format( - "Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) " + - "a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be " + - "allocated by user code or some of its dependencies. In this case '%s' configuration option should be " + - "increased. Flink framework and its dependencies also consume the direct memory, mostly for network " + - "communication. The most of network memory is managed by Flink and should not result in out-of-memory " + - "error. In certain special cases, in particular for jobs with high parallelism, the framework may " + - "require more direct memory which is not managed by Flink. In this case '%s' configuration option " + - "should be increased. If the error persists then there is probably a direct memory leak which has to " + - "be investigated and fixed. The task executor has to be shutdown...", - TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(), - TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key()); - - private static final String TM_METASPACE_OOM_ERROR_MESSAGE = String.format( - "Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires " + - "a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case " + - "'%s' configuration option should be increased. If the error persists (usually in cluster after " + - "several job (re-)submissions) then there is probably a class loading leak which has to be " + - "investigated and fixed. The task executor has to be shutdown...", - TaskManagerOptions.JVM_METASPACE.key()); - /** * Makes a string representation of the exception's stack trace, or "(null)", if the * exception is null. @@ -142,24 +120,33 @@ public static boolean isJvmFatalOrOutOfMemoryError(Throwable t) { * {@code null} if the argument was {@code null} */ @Nullable - public static Throwable tryEnrichTaskManagerError(@Nullable Throwable exception) { - if (exception instanceof OutOfMemoryError) { - return tryEnrichTaskManagerOutOfMemoryError((OutOfMemoryError) exception); + static Throwable tryEnrichOutOfMemoryError( + @Nullable Throwable exception, + String jvmMetaspaceOomNewErrorMessage, + String jvmDirectOomNewErrorMessage) { + boolean isOom = exception instanceof OutOfMemoryError; + if (!isOom) { + return exception; } - return exception; - } - - private static Throwable tryEnrichTaskManagerOutOfMemoryError(OutOfMemoryError oom) { + OutOfMemoryError oom = (OutOfMemoryError) exception; if (isMetaspaceOutOfMemoryError(oom)) { - return changeOutOfMemoryErrorMessage(oom, TM_METASPACE_OOM_ERROR_MESSAGE); + return changeOutOfMemoryErrorMessage(oom, jvmMetaspaceOomNewErrorMessage); } else if (isDirectOutOfMemoryError(oom)) { - return changeOutOfMemoryErrorMessage(oom, TM_DIRECT_OOM_ERROR_MESSAGE); + return changeOutOfMemoryErrorMessage(oom, jvmDirectOomNewErrorMessage); } return oom; } + /** + * Rewrites the error message of a {@link OutOfMemoryError}. + * + * @param oom original {@link OutOfMemoryError} + * @param newMessage new error message + * @return the origianl {@link OutOfMemoryError} if it already has the new error message or + * a new {@link OutOfMemoryError} with the new error message + */ private static OutOfMemoryError changeOutOfMemoryErrorMessage(OutOfMemoryError oom, String newMessage) { if (oom.getMessage().equals(newMessage)) { return oom; diff --git a/flink-core/src/main/java/org/apache/flink/util/TaskManagerExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/TaskManagerExceptionUtils.java new file mode 100644 index 0000000000000..b1c680e67d86d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/TaskManagerExceptionUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.configuration.TaskManagerOptions; + +import javax.annotation.Nullable; + +import static org.apache.flink.util.ExceptionUtils.tryEnrichOutOfMemoryError; + +/** + * Exception utils to handle and enrich exceptions occurring in TaskManager. + */ +public class TaskManagerExceptionUtils { + private static final String TM_DIRECT_OOM_ERROR_MESSAGE = String.format( + "Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) " + + "a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be " + + "allocated by user code or some of its dependencies. In this case '%s' configuration option should be " + + "increased. Flink framework and its dependencies also consume the direct memory, mostly for network " + + "communication. The most of network memory is managed by Flink and should not result in out-of-memory " + + "error. In certain special cases, in particular for jobs with high parallelism, the framework may " + + "require more direct memory which is not managed by Flink. In this case '%s' configuration option " + + "should be increased. If the error persists then there is probably a direct memory leak in user code or " + + "some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...", + TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(), + TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key()); + + private static final String TM_METASPACE_OOM_ERROR_MESSAGE = String.format( + "Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires " + + "a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case " + + "'%s' configuration option should be increased. If the error persists (usually in cluster after " + + "several job (re-)submissions) then there is probably a class loading leak in user code or some of its dependencies " + + "which has to be investigated and fixed. The task executor has to be shutdown...", + TaskManagerOptions.JVM_METASPACE.key()); + + private TaskManagerExceptionUtils() { + } + + /** + * Tries to enrich the passed exception with additional information. + * + *

This method improves error message for direct and metaspace {@link OutOfMemoryError}. + * It adds description of possible causes and ways of resolution. + * + * @param exception exception to enrich if not {@code null} + * @return the enriched exception or the original if no additional information could be added; + * {@code null} if the argument was {@code null} + */ + @Nullable + public static Throwable tryEnrichTaskManagerError(@Nullable Throwable exception) { + return tryEnrichOutOfMemoryError(exception, TM_METASPACE_OOM_ERROR_MESSAGE, TM_DIRECT_OOM_ERROR_MESSAGE); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java index fc72fbb4ca745..6114fec6dbe00 100644 --- a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java @@ -91,7 +91,7 @@ public void testInvalidExceptionStripping() { @Test public void testTryEnrichTaskExecutorErrorCanHandleNullValue() { - assertThat(ExceptionUtils.tryEnrichTaskManagerError(null), is(nullValue())); + assertThat(ExceptionUtils.tryEnrichOutOfMemoryError(null, "", ""), is(nullValue())); } @Test diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 283ca5e085842..58f29a4201b75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.util.TaskManagerExceptionUtils; import org.apache.flink.runtime.taskmanager.MemoryLogger; import org.apache.flink.runtime.util.ConfigurationParserUtils; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -251,7 +252,7 @@ public CompletableFuture getTerminationFuture() { @Override public void onFatalError(Throwable exception) { - Throwable enrichedException = ExceptionUtils.tryEnrichTaskManagerError(exception); + Throwable enrichedException = TaskManagerExceptionUtils.tryEnrichTaskManagerError(exception); LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", enrichedException); // In case of the Metaspace OutOfMemoryError, we expect that the graceful shutdown is possible, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index f0bff660cae70..bd5b1033114e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -75,6 +75,7 @@ import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker; +import org.apache.flink.util.TaskManagerExceptionUtils; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload; import org.apache.flink.runtime.util.FatalExitExceptionHandler; import org.apache.flink.types.Either; @@ -754,7 +755,7 @@ else if (current == ExecutionState.CANCELING) { // an exception was thrown as a side effect of cancelling // ---------------------------------------------------------------- - t = ExceptionUtils.tryEnrichTaskManagerError(t); + t = TaskManagerExceptionUtils.tryEnrichTaskManagerError(t); try { // check if the exception is unrecoverable