Skip to content

Commit

Permalink
[FLINK-27256][runtime] Log the root exception in closing the task man…
Browse files Browse the repository at this point in the history
…ager connection

This closes apache#19481.
  • Loading branch information
KarmaGYZ committed Apr 19, 2022
1 parent 50dacad commit da53242
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 22 deletions.
24 changes: 24 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.function.RunnableWithException;

import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.io.IOException;
Expand Down Expand Up @@ -678,6 +680,28 @@ public static void checkInterrupted(Throwable e) {
}
}

/**
* Return the given exception if it is not a {@link FlinkExpectedException}.
*
* @param e the given exception
* @return the given exception if it is not a {@link FlinkExpectedException}
*/
public static Throwable returnExceptionIfUnexpected(Throwable e) {
return e instanceof FlinkExpectedException ? null : e;
}

/**
* Log the given exception in debug level if it is a {@link FlinkExpectedException}.
*
* @param e the given exception
* @param log logger
*/
public static void logExceptionIfExcepted(Throwable e, Logger log) {
if (e instanceof FlinkExpectedException) {
log.debug("Expected exception.", e);
}
}

// ------------------------------------------------------------------------
// Lambda exception utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/** This class is just used to pass diagnostic message of some excepted procedure. */
public class FlinkExpectedException extends Exception {
private static final long serialVersionUID = 1L;

public FlinkExpectedException(String message) {
super(message);
}

public FlinkExpectedException(String message, Throwable cause) {
super(message, cause);
}

public FlinkExpectedException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -496,10 +496,12 @@ public CompletableFuture<Acknowledge> notifyPartitionDataAvailable(
@Override
public CompletableFuture<Acknowledge> disconnectTaskManager(
final ResourceID resourceID, final Exception cause) {
log.debug(
log.info(
"Disconnect TaskExecutor {} because: {}",
resourceID.getStringWithMetadata(),
cause.getMessage());
cause.getMessage(),
ExceptionUtils.returnExceptionIfUnexpected(cause.getCause()));
ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log);

taskManagerHeartbeatManager.unmonitorTarget(resourceID);
slotPoolService.releaseTaskManager(resourceID, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,9 @@ protected Optional<WorkerType> closeTaskManagerConnection(
log.info(
"Closing TaskExecutor connection {} because: {}",
resourceID.getStringWithMetadata(),
cause.getMessage());
cause.getMessage(),
ExceptionUtils.returnExceptionIfUnexpected(cause.getCause()));
ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log);

// TODO :: suggest failed task executor to stop itself
slotManager.unregisterTaskManager(workerRegistration.getInstanceID(), cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkExpectedException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
Expand Down Expand Up @@ -332,7 +332,8 @@ && isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
maxTotalMem.toHumanReadableString());
resourceActions.releaseResource(
taskExecutorConnection.getInstanceID(),
new FlinkException("The max total resource limitation is reached."));
new FlinkExpectedException(
"The max total resource limitation is reached."));
return false;
}

Expand Down Expand Up @@ -707,7 +708,8 @@ private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo)
}

private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
final FlinkException cause = new FlinkException("TaskManager exceeded the idle timeout.");
final FlinkExpectedException cause =
new FlinkExpectedException("TaskManager exceeded the idle timeout.");
resourceActions.releaseResource(timedOutTaskManagerId, cause);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkExpectedException;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
Expand Down Expand Up @@ -159,7 +159,8 @@ public boolean registerTaskManager(
maxSlotNum);
resourceActions.releaseResource(
taskExecutorConnection.getInstanceID(),
new FlinkException("The total number of slots exceeds the max limitation."));
new FlinkExpectedException(
"The total number of slots exceeds the max limitation."));
return false;
}

Expand Down Expand Up @@ -400,7 +401,8 @@ private void releaseIdleTaskExecutorIfPossible(
}

private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
final FlinkExpectedException cause =
new FlinkExpectedException("TaskExecutor exceeded the idle timeout.");
LOG.debug(
"Release TaskExecutor {} because it exceeded the idle timeout.",
timedOutTaskManagerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkExpectedException;
import org.apache.flink.util.OptionalConsumer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
Expand Down Expand Up @@ -453,7 +454,8 @@ public CompletableFuture<Void> onStop() {

Throwable jobManagerDisconnectThrowable = null;

FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
FlinkExpectedException cause =
new FlinkExpectedException("The TaskExecutor is shutting down.");

closeResourceManagerConnection(cause);

Expand Down Expand Up @@ -1414,11 +1416,11 @@ private void closeResourceManagerConnection(Exception cause) {
final ResourceID resourceManagerResourceId =
establishedResourceManagerConnection.getResourceManagerResourceId();

if (log.isDebugEnabled()) {
log.debug("Close ResourceManager connection {}.", resourceManagerResourceId, cause);
} else {
log.info("Close ResourceManager connection {}.", resourceManagerResourceId);
}
log.info(
"Close ResourceManager connection {}.",
resourceManagerResourceId,
ExceptionUtils.returnExceptionIfUnexpected(cause.getCause()));
ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log);
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId);

ResourceManagerGateway resourceManagerGateway =
Expand Down Expand Up @@ -1665,11 +1667,11 @@ private void closeJob(JobTable.Job job, Exception cause) {
private void disconnectJobManagerConnection(
JobTable.Connection jobManagerConnection, Exception cause) {
final JobID jobId = jobManagerConnection.getJobId();
if (log.isDebugEnabled()) {
log.debug("Close JobManager connection for job {}.", jobId, cause);
} else {
log.info("Close JobManager connection for job {}.", jobId);
}
log.info(
"Close JobManager connection for job {}.",
jobId,
ExceptionUtils.returnExceptionIfUnexpected(cause.getCause()));
ExceptionUtils.logExceptionIfExcepted(cause.getCause(), log);

// 1. fail tasks running under this JobID
Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
Expand Down Expand Up @@ -1970,8 +1972,8 @@ private void closeJobManagerConnectionIfNoAllocatedResources(JobID jobId) {
&& !partitionTracker.isTrackingPartitionsFor(jobId)) {
// we can remove the job from the job leader service

final FlinkException cause =
new FlinkException(
final FlinkExpectedException cause =
new FlinkExpectedException(
"TaskExecutor "
+ getAddress()
+ " has no more allocated slots for job "
Expand Down

0 comments on commit da53242

Please sign in to comment.