diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index 1f58b119f2768..4f4a6d1942c3a 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -362,9 +362,9 @@ public CompletableFuture postStop() { } @Override - protected void shutDownApplication( + protected void internalDeregisterApplication( ApplicationStatus finalStatus, - @Nullable String optionalDiagnostics) throws ResourceManagerException { + @Nullable String diagnostics) throws ResourceManagerException { LOG.info("Shutting down and unregistering as a Mesos framework."); Exception exception = null; diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index 412e18da65a30..5d9a6cffd7038 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -737,7 +737,7 @@ public void testStopWorker() throws Exception { public void testShutdownApplication() throws Exception { new Context() {{ startResourceManager(); - resourceManager.shutDownCluster(ApplicationStatus.SUCCEEDED, ""); + resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, ""); // verify that the Mesos framework is shutdown verify(rmServices.schedulerDriver).stop(false); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 50d0db335f442..b25729bd1e023 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -197,6 +197,7 @@ protected void startCluster() { shutDownAndTerminate( STARTUP_FAILURE_RETURN_CODE, ApplicationStatus.FAILED, + t.getMessage(), false); } } @@ -245,6 +246,7 @@ protected void runCluster(Configuration configuration) throws Exception { shutDownAndTerminate( SUCCESS_RETURN_CODE, ApplicationStatus.SUCCEEDED, + throwable != null ? throwable.getMessage() : null, true); }); } @@ -544,38 +546,34 @@ private Configuration generateClusterConfiguration(Configuration configuration) return resultConfiguration; } - private CompletableFuture shutDownAsync(boolean cleanupHaData) { + private CompletableFuture shutDownAsync( + boolean cleanupHaData, + ApplicationStatus applicationStatus, + @Nullable String diagnostics) { if (isShutDown.compareAndSet(false, true)) { LOG.info("Stopping {}.", getClass().getSimpleName()); - final CompletableFuture componentShutdownFuture = stopClusterComponents(); - - componentShutdownFuture.whenComplete( - (Void ignored1, Throwable componentThrowable) -> { - final CompletableFuture serviceShutdownFuture = stopClusterServices(cleanupHaData); - - serviceShutdownFuture.whenComplete( - (Void ignored2, Throwable serviceThrowable) -> { - Throwable finalException = null; - - if (serviceThrowable != null) { - finalException = ExceptionUtils.firstOrSuppressed(serviceThrowable, componentThrowable); - } else if (componentThrowable != null) { - finalException = componentThrowable; - } - - try { - cleanupDirectories(); - } catch (IOException e) { - finalException = ExceptionUtils.firstOrSuppressed(e, finalException); - } - - if (finalException != null) { - terminationFuture.completeExceptionally(finalException); - } else { - terminationFuture.complete(null); - } - }); + final CompletableFuture shutDownApplicationFuture = deregisterApplication(applicationStatus, diagnostics); + + final CompletableFuture componentShutdownFuture = FutureUtils.composeAfterwards( + shutDownApplicationFuture, + this::stopClusterComponents); + + final CompletableFuture serviceShutdownFuture = FutureUtils.composeAfterwards( + componentShutdownFuture, + () -> stopClusterServices(cleanupHaData)); + + final CompletableFuture cleanupDirectoriesFuture = FutureUtils.runAfterwards( + serviceShutdownFuture, + this::cleanupDirectories); + + cleanupDirectoriesFuture.whenComplete( + (Void ignored2, Throwable serviceThrowable) -> { + if (serviceThrowable != null) { + terminationFuture.completeExceptionally(serviceThrowable); + } else { + terminationFuture.complete(null); + } }); } @@ -585,6 +583,7 @@ private CompletableFuture shutDownAsync(boolean cleanupHaData) { private void shutDownAndTerminate( int returnCode, ApplicationStatus applicationStatus, + @Nullable String diagnostics, boolean cleanupHaData) { if (isTerminating.compareAndSet(false, true)) { @@ -593,7 +592,10 @@ private void shutDownAndTerminate( returnCode, applicationStatus); - shutDownAsync(cleanupHaData).whenComplete( + shutDownAsync( + cleanupHaData, + applicationStatus, + diagnostics).whenComplete( (Void ignored, Throwable t) -> { if (t != null) { LOG.info("Could not properly shut down cluster entrypoint.", t); @@ -608,6 +610,25 @@ private void shutDownAndTerminate( } } + /** + * Deregister the Flink application from the resource management system by signalling + * the {@link ResourceManager}. + * + * @param applicationStatus to terminate the application with + * @param diagnostics additional information about the shut down, can be {@code null} + * @return Future which is completed once the shut down + */ + private CompletableFuture deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) { + synchronized (lock) { + if (resourceManager != null) { + final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null); + } else { + return CompletableFuture.completedFuture(null); + } + } + } + /** * Clean up of temporary directories created by the {@link ClusterEntrypoint}. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index cae9c6cdb383c..c75346900cdd8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -486,19 +486,21 @@ public void unRegisterInfoMessageListener(final String address) { * Cleanup application and shut down cluster. * * @param finalStatus of the Flink application - * @param optionalDiagnostics diagnostics message for the Flink application or {@code null} + * @param diagnostics diagnostics message for the Flink application or {@code null} */ @Override - public void shutDownCluster( + public CompletableFuture deregisterApplication( final ApplicationStatus finalStatus, - @Nullable final String optionalDiagnostics) { - log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics); + @Nullable final String diagnostics) { + log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, diagnostics); try { - shutDownApplication(finalStatus, optionalDiagnostics); + internalDeregisterApplication(finalStatus, diagnostics); } catch (ResourceManagerException e) { log.warn("Could not properly shutdown the application.", e); } + + return CompletableFuture.completedFuture(Acknowledge.get()); } @Override @@ -946,7 +948,7 @@ public void handleError(final Exception exception) { protected abstract void initialize() throws ResourceManagerException; /** - * The framework specific code for shutting down the application. This should report the + * The framework specific code to deregister the application. This should report the * application's final status and shut down the resource manager cleanly. * *

This method also needs to make sure all pending containers that are not registered @@ -956,7 +958,7 @@ public void handleError(final Exception exception) { * @param optionalDiagnostics A diagnostics message or {@code null}. * @throws ResourceManagerException if the application could not be shut down. */ - protected abstract void shutDownApplication( + protected abstract void internalDeregisterApplication( ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 836bc0b0faf00..bd282d6cff0bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -41,6 +41,8 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -133,11 +135,12 @@ void notifySlotAvailable( void unRegisterInfoMessageListener(String infoMessageListenerAddress); /** - * shutdown cluster - * @param finalStatus - * @param optionalDiagnostics + * Deregister Flink from the underlying resource management system. + * + * @param finalStatus final status with which to deregister the Flink application + * @param diagnostics additional information for the resource management system, can be {@code null} */ - void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics); + CompletableFuture deregisterApplication(final ApplicationStatus finalStatus, @Nullable final String diagnostics); /** * Gets the currently registered number of TaskManagers. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 7226d296f373d..d8e0e480a2a4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -72,7 +72,7 @@ protected void initialize() throws ResourceManagerException { } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) { + protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) { } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index 3db9be032e6ea..2bd976bd9bf32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -68,7 +68,7 @@ protected void initialize() throws ResourceManagerException { } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException { + protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) throws ResourceManagerException { // noop } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index 33c6c08d66704..9b4041414d62e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -214,8 +214,8 @@ public void unRegisterInfoMessageListener(String infoMessageListenerAddress) { } @Override - public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) { - + public CompletableFuture deregisterApplication(ApplicationStatus finalStatus, String diagnostics) { + return CompletableFuture.completedFuture(Acknowledge.get()); } @Override diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 97db2ad8a37a1..bfe7d65262ac6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -266,16 +266,16 @@ public CompletableFuture postStop() { } @Override - protected void shutDownApplication( + protected void internalDeregisterApplication( ApplicationStatus finalStatus, - @Nullable String optionalDiagnostics) { + @Nullable String diagnostics) { // first, de-register from YARN FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus); log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus); try { - resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, ""); + resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, ""); } catch (Throwable t) { log.error("Could not unregister the application master.", t); }