diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index c6fbd4f02301c..96443d19e7faa 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -29,6 +29,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.util.function.RunnableWithException; +import org.slf4j.Logger; + import javax.annotation.Nullable; import java.io.IOException; @@ -678,6 +680,28 @@ public static void checkInterrupted(Throwable e) { } } + /** + * Return the given exception if it is not a {@link FlinkExpectedException}. + * + * @param e the given exception + * @return the given exception if it is not a {@link FlinkExpectedException} + */ + public static Throwable returnExceptionIfUnexpected(Throwable e) { + return e instanceof FlinkExpectedException ? null : e; + } + + /** + * Log the given exception in debug level if it is a {@link FlinkExpectedException}. + * + * @param e the given exception + * @param log logger + */ + public static void logExceptionIfExcepted(Throwable e, Logger log) { + if (e instanceof FlinkExpectedException) { + log.debug("Expected exception.", e); + } + } + // ------------------------------------------------------------------------ // Lambda exception utilities // ------------------------------------------------------------------------ diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java b/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java new file mode 100644 index 0000000000000..ef22026f1589a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/FlinkExpectedException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +/** This class is just used to pass diagnostic message of some excepted procedure. */ +public class FlinkExpectedException extends Exception { + private static final long serialVersionUID = 1L; + + public FlinkExpectedException(String message) { + super(message); + } + + public FlinkExpectedException(String message, Throwable cause) { + super(message, cause); + } + + public FlinkExpectedException(Throwable cause) { + super(cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index af8997465e355..1884d33230c53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -496,10 +496,12 @@ public CompletableFuture notifyPartitionDataAvailable( @Override public CompletableFuture disconnectTaskManager( final ResourceID resourceID, final Exception cause) { - log.debug( + log.info( "Disconnect TaskExecutor {} because: {}", resourceID.getStringWithMetadata(), - cause.getMessage()); + cause.getMessage(), + ExceptionUtils.returnExceptionIfUnexpected(cause.getCause())); + ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log); taskManagerHeartbeatManager.unmonitorTarget(resourceID); slotPoolService.releaseTaskManager(resourceID, cause); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index f4834efbc3d6e..3cd99ca0cf78a 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -1014,7 +1014,9 @@ protected Optional closeTaskManagerConnection( log.info( "Closing TaskExecutor connection {} because: {}", resourceID.getStringWithMetadata(), - cause.getMessage()); + cause.getMessage(), + ExceptionUtils.returnExceptionIfUnexpected(cause.getCause())); + ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log); // TODO :: suggest failed task executor to stop itself slotManager.unregisterTaskManager(workerRegistration.getInstanceID(), cause); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java index 8872d8c3bab8f..f3d0b3fcb7986 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java @@ -37,7 +37,7 @@ import org.apache.flink.runtime.slots.ResourceRequirements; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.util.ResourceCounter; -import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkExpectedException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -332,7 +332,8 @@ && isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) { maxTotalMem.toHumanReadableString()); resourceActions.releaseResource( taskExecutorConnection.getInstanceID(), - new FlinkException("The max total resource limitation is reached.")); + new FlinkExpectedException( + "The max total resource limitation is reached.")); return false; } @@ -707,7 +708,8 @@ private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo) } private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) { - final FlinkException cause = new FlinkException("TaskManager exceeded the idle timeout."); + final FlinkExpectedException cause = + new FlinkExpectedException("TaskManager exceeded the idle timeout."); resourceActions.releaseResource(timedOutTaskManagerId, cause); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java index ebe61ace658c1..1df5a872ee482 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; -import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkExpectedException; import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -159,7 +159,8 @@ public boolean registerTaskManager( maxSlotNum); resourceActions.releaseResource( taskExecutorConnection.getInstanceID(), - new FlinkException("The total number of slots exceeds the max limitation.")); + new FlinkExpectedException( + "The total number of slots exceeds the max limitation.")); return false; } @@ -400,7 +401,8 @@ private void releaseIdleTaskExecutorIfPossible( } private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) { - final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout."); + final FlinkExpectedException cause = + new FlinkExpectedException("TaskExecutor exceeded the idle timeout."); LOG.debug( "Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index b7e6ee285785b..10b5b6eae61f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -132,6 +132,7 @@ import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkExpectedException; import org.apache.flink.util.OptionalConsumer; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -453,7 +454,8 @@ public CompletableFuture onStop() { Throwable jobManagerDisconnectThrowable = null; - FlinkException cause = new FlinkException("The TaskExecutor is shutting down."); + FlinkExpectedException cause = + new FlinkExpectedException("The TaskExecutor is shutting down."); closeResourceManagerConnection(cause); @@ -1414,11 +1416,11 @@ private void closeResourceManagerConnection(Exception cause) { final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId(); - if (log.isDebugEnabled()) { - log.debug("Close ResourceManager connection {}.", resourceManagerResourceId, cause); - } else { - log.info("Close ResourceManager connection {}.", resourceManagerResourceId); - } + log.info( + "Close ResourceManager connection {}.", + resourceManagerResourceId, + ExceptionUtils.returnExceptionIfUnexpected(cause.getCause())); + ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log); resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId); ResourceManagerGateway resourceManagerGateway = @@ -1665,11 +1667,11 @@ private void closeJob(JobTable.Job job, Exception cause) { private void disconnectJobManagerConnection( JobTable.Connection jobManagerConnection, Exception cause) { final JobID jobId = jobManagerConnection.getJobId(); - if (log.isDebugEnabled()) { - log.debug("Close JobManager connection for job {}.", jobId, cause); - } else { - log.info("Close JobManager connection for job {}.", jobId); - } + log.info( + "Close JobManager connection for job {}.", + jobId, + ExceptionUtils.returnExceptionIfUnexpected(cause.getCause())); + ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log); // 1. fail tasks running under this JobID Iterator tasks = taskSlotTable.getTasks(jobId); @@ -1970,8 +1972,8 @@ private void closeJobManagerConnectionIfNoAllocatedResources(JobID jobId) { && !partitionTracker.isTrackingPartitionsFor(jobId)) { // we can remove the job from the job leader service - final FlinkException cause = - new FlinkException( + final FlinkExpectedException cause = + new FlinkExpectedException( "TaskExecutor " + getAddress() + " has no more allocated slots for job "