Skip to content

Commit

Permalink
[FLINK-8677] [flip6] Make ClusterEntrypoint shut down non-blocking
Browse files Browse the repository at this point in the history
Makes the ClusterEntrypoint shut down method non-blocking. This also removes
the need to run the Dispatcher#terminationFuture callback in the common
Fork-Join pool.

This closes apache#5512.
  • Loading branch information
tillrohrmann committed Feb 23, 2018
1 parent 65081ac commit fba655a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
Expand All @@ -43,7 +44,6 @@
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

Expand All @@ -59,6 +59,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.CompletableFuture;

/**
* Entry point for Mesos per-job clusters.
Expand Down Expand Up @@ -167,26 +168,16 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc
}

@Override
protected void stopClusterServices(boolean cleanupHaData) throws FlinkException {
Throwable exception = null;

try {
super.stopClusterServices(cleanupHaData);
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

if (mesosServices != null) {
try {
mesosServices.close(cleanupHaData);
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
}

if (exception != null) {
throw new FlinkException("Could not properly shut down the Mesos job cluster entry point.", exception);
}
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);

return FutureUtils.runAfterwards(
serviceShutDownFuture,
() -> {
if (mesosServices != null) {
mesosServices.close(cleanupHaData);
}
});
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
Expand All @@ -42,7 +43,6 @@
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import org.apache.commons.cli.CommandLine;
Expand All @@ -52,6 +52,8 @@

import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

/**
* Entry point for Mesos session clusters.
*/
Expand Down Expand Up @@ -141,26 +143,16 @@ protected ResourceManager<?> createResourceManager(
}

@Override
protected void stopClusterServices(boolean cleanupHaData) throws FlinkException {
Throwable exception = null;

try {
super.stopClusterServices(cleanupHaData);
} catch (Throwable t) {
exception = t;
}

if (mesosServices != null) {
try {
mesosServices.close(cleanupHaData);
} catch (Throwable t) {
exception = t;
}
}

if (exception != null) {
throw new FlinkException("Could not properly shut down the Mesos session cluster entry point.", exception);
}
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);

return FutureUtils.runAfterwards(
serviceShutDownFuture,
() -> {
if (mesosServices != null) {
mesosServices.close(cleanupHaData);
}
});
}

public static void main(String[] args) {
Expand Down
Loading

0 comments on commit fba655a

Please sign in to comment.