Skip to content

Commit

Permalink
[FLINK-11253] Add shutdown hook for yarn session client
Browse files Browse the repository at this point in the history
  • Loading branch information
Tao Yang authored and tillrohrmann committed Jan 10, 2019
1 parent 5fe918c commit 1abf8d1
Showing 1 changed file with 52 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
Expand Down Expand Up @@ -650,48 +651,22 @@ public int run(String[] args) throws CliArgsException, FlinkException {
yarnClusterDescriptor.getYarnClient(),
yarnApplicationId,
new ScheduledExecutorServiceAdapter(scheduledExecutorService));

Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
() -> shutdownCluster(yarnClusterDescriptor, clusterClient, scheduledExecutorService,
yarnApplicationStatusMonitor, yarnApplicationId),
getClass().getSimpleName(),
LOG);
try {
runInteractiveCli(
clusterClient,
yarnApplicationStatusMonitor,
acceptInteractiveInput);
} finally {
try {
yarnApplicationStatusMonitor.close();
} catch (Exception e) {
LOG.info("Could not properly close the Yarn application status monitor.", e);
}

clusterClient.shutDownCluster();

try {
clusterClient.shutdown();
} catch (Exception e) {
LOG.info("Could not properly shutdown cluster client.", e);
}

// shut down the scheduled executor service
ExecutorUtils.gracefulShutdown(
1000L,
TimeUnit.MILLISECONDS,
scheduledExecutorService);

deleteYarnPropertiesFile();

ApplicationReport applicationReport;

try {
applicationReport = yarnClusterDescriptor
.getYarnClient()
.getApplicationReport(yarnApplicationId);
} catch (YarnException | IOException e) {
LOG.info("Could not log the final application report.", e);
applicationReport = null;
}

if (applicationReport != null) {
logFinalApplicationReport(applicationReport);
shutdownCluster(yarnClusterDescriptor, clusterClient, scheduledExecutorService,
yarnApplicationStatusMonitor, yarnApplicationId);
if (shutdownHook != null) {
// we do not need the hook anymore as we have just tried to shutdown the cluster.
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
}
}
}
Expand All @@ -707,6 +682,47 @@ public int run(String[] args) throws CliArgsException, FlinkException {
return 0;
}

private void shutdownCluster(AbstractYarnClusterDescriptor yarnClusterDescriptor,
ClusterClient clusterClient, ScheduledExecutorService scheduledExecutorService,
YarnApplicationStatusMonitor yarnApplicationStatusMonitor, ApplicationId yarnApplicationId) {
try {
yarnApplicationStatusMonitor.close();
} catch (Exception e) {
LOG.info("Could not properly close the Yarn application status monitor.", e);
}

clusterClient.shutDownCluster();

try {
clusterClient.shutdown();
} catch (Exception e) {
LOG.info("Could not properly shutdown cluster client.", e);
}

// shut down the scheduled executor service
ExecutorUtils.gracefulShutdown(
1000L,
TimeUnit.MILLISECONDS,
scheduledExecutorService);

deleteYarnPropertiesFile();

ApplicationReport applicationReport;

try {
applicationReport = yarnClusterDescriptor
.getYarnClient()
.getApplicationReport(yarnApplicationId);
} catch (YarnException | IOException e) {
LOG.info("Could not log the final application report.", e);
applicationReport = null;
}

if (applicationReport != null) {
logFinalApplicationReport(applicationReport);
}
}

private void logFinalApplicationReport(ApplicationReport appReport) {
LOG.info("Application " + appReport.getApplicationId() + " finished with state " + appReport
.getYarnApplicationState() + " and final state " + appReport
Expand Down

0 comments on commit 1abf8d1

Please sign in to comment.