Skip to content

Commit

Permalink
[hotfix] Introduce ShutdownHookUtil to avoid code duplication
Browse files Browse the repository at this point in the history
(Un)registering shotdown hooks for cleanups is a very common concern in Flink.
Many places in the code essentially duplicate all the code for doing this.
This commit introduces a utils class and deduplicates the code.
  • Loading branch information
StefanRRichter committed Feb 25, 2018
1 parent 4e7f03e commit 4d19b1c
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 262 deletions.
100 changes: 100 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import org.slf4j.Logger;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Utils class for dealing with JVM shutdown hooks.
*/
public class ShutdownHookUtil {

/**
* Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
*/
public static Thread addShutdownHook(
final AutoCloseable service,
final String serviceName,
final Logger logger) {

checkNotNull(service);
checkNotNull(logger);

final Thread shutdownHook = new Thread(() -> {
try {
service.close();
} catch (Throwable t) {
logger.error("Error during shutdown of {} via JVM shutdown hook.", serviceName, t);
}
}, serviceName + " shutdown hook");

return addShutdownHookThread(shutdownHook, serviceName, logger) ? shutdownHook : null;
}

/**
* Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
*/
public static boolean addShutdownHookThread(
final Thread shutdownHook,
final String serviceName,
final Logger logger) {

checkNotNull(shutdownHook);
checkNotNull(logger);

try {
// Add JVM shutdown hook to call shutdown of service
Runtime.getRuntime().addShutdownHook(shutdownHook);
return true;
} catch (IllegalStateException e) {
// JVM is already shutting down. no need to do our work
} catch (Throwable t) {
logger.error("Cannot register shutdown hook that cleanly terminates {}.", serviceName, t);
}
return false;
}

/**
* Removes a shutdown hook from the JVM.
*/
public static void removeShutdownHook(final Thread shutdownHook, final String serviceName, final Logger logger) {

// Do not run if this is invoked by the shutdown hook itself
if (shutdownHook == null || shutdownHook == Thread.currentThread()) {
return;
}

checkNotNull(logger);

try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
logger.debug("Unable to remove shutdown hook for {}, shutdown already in progress", serviceName, e);
} catch (Throwable t) {
logger.warn("Exception while un-registering {}'s shutdown hook.", serviceName, t);
}
}

private ShutdownHookUtil() {
throw new AssertionError();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ShutdownHookUtil;

import java.io.File;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -224,19 +225,8 @@ protected PlanExecutor getExecutor() throws Exception {
// ------------------------------------------------------------------------

protected void dispose() {
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
// shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
}
catch (Throwable t) {
LOG.warn("Exception while unregistering the cleanup shutdown hook.");
}
}
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

try {
PlanExecutor executor = this.executor;
Expand All @@ -262,29 +252,7 @@ public String toString() {

private void installShutdownHook() {
if (shutdownHook == null) {
Thread shutdownHook = new Thread(new Runnable() {
@Override
public void run() {
try {
dispose();
}
catch (Throwable t) {
LOG.error("Error in cleanup of RemoteEnvironment during JVM shutdown: " + t.getMessage(), t);
}
}
});

try {
// Add JVM shutdown hook to call shutdown of service
Runtime.getRuntime().addShutdownHook(shutdownHook);
this.shutdownHook = shutdownHook;
}
catch (IllegalStateException e) {
// JVM is already shutting down. no need or a shutdown hook
}
catch (Throwable t) {
LOG.error("Cannot register shutdown hook that cleanly terminates the BLOB service.");
}
this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::dispose, getClass().getSimpleName(), LOG);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -124,18 +125,10 @@ private void startPython() throws IOException {
errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg));
errorPrinter.start();

shutdownThread = new Thread() {
@Override
public void run() {
try {
destroyProcess(process);
} catch (IOException ioException) {
LOG.warn("Could not destroy python process.", ioException);
}
}
};

Runtime.getRuntime().addShutdownHook(shutdownThread);
shutdownThread = ShutdownHookUtil.addShutdownHook(
() -> destroyProcess(process),
getClass().getSimpleName(),
LOG);

OutputStream processOutput = process.getOutputStream();
processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
Expand Down Expand Up @@ -207,9 +200,7 @@ public void close() throws IOException {
throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
}

if (shutdownThread != null) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
ShutdownHookUtil.removeShutdownHook(shutdownThread, getClass().getSimpleName(), LOG);

ExceptionUtils.tryRethrowIOException(throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
Expand Down Expand Up @@ -382,20 +383,7 @@ public WebRuntimeMonitor(
webRootDir));

// add shutdown hook for deleting the directories and remaining temp files on shutdown
try {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
cleanup();
}
});
} catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
} catch (Throwable t) {
// these errors usually happen when the shutdown is already in progress
LOG.warn("Error while adding shutdown hook", t);
}
ShutdownHookUtil.addShutdownHook(this::cleanup, getClass().getSimpleName(), LOG);

this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;

Expand Down Expand Up @@ -181,22 +182,10 @@ public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) thro
long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);

this.shutdownHook = new Thread() {
@Override
public void run() {
HistoryServer.this.stop();
}
};
// add shutdown hook for deleting the directories and remaining temp files on shutdown
try {
Runtime.getRuntime().addShutdownHook(shutdownHook);
} catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
} catch (Throwable t) {
// these errors usually happen when the shutdown is already in progress
LOG.warn("Error while adding shutdown hook", t);
}
this.shutdownHook = ShutdownHookUtil.addShutdownHook(
HistoryServer.this::stop,
HistoryServer.class.getSimpleName(),
LOG);
}

@VisibleForTesting
Expand Down Expand Up @@ -263,16 +252,8 @@ void stop() {

LOG.info("Stopped history server.");

// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (IllegalStateException ignored) {
// race, JVM is in shutdown already, we can safely ignore this
} catch (Throwable t) {
LOG.warn("Exception while unregistering HistoryServer cleanup shutdown hook.");
}
}
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;

Expand Down Expand Up @@ -116,7 +117,7 @@ public AbstractBlobCache(
}

// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, log);
shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), log);

this.serverAddress = serverAddress;
}
Expand Down Expand Up @@ -249,16 +250,8 @@ public void close() throws IOException {
try {
FileUtils.deleteDirectory(storageDir);
} finally {
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
} catch (Throwable t) {
log.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
}
}
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), log);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -169,7 +170,7 @@ public BlobServer(Configuration config, BlobStore blobStore) throws IOException
.schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(),
storageDir, LOG), cleanupInterval, cleanupInterval);

this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);

if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
try {
Expand Down Expand Up @@ -345,19 +346,8 @@ public void close() throws IOException {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
// shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
}
catch (Throwable t) {
LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", t);
}
}
// Remove shutdown hook to prevent resource leaks
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);

if (LOG.isInfoEnabled()) {
LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort());
Expand Down
Loading

0 comments on commit 4d19b1c

Please sign in to comment.