diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutorFactory.java index 092b0d8ab5e1b..8bc605644a815 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutorFactory.java @@ -30,6 +30,11 @@ @Internal public class LocalExecutorFactory implements PipelineExecutorFactory { + @Override + public String getName() { + return LocalExecutor.NAME; + } + @Override public boolean isCompatibleWith(final Configuration configuration) { return LocalExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET)); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java index 42840e3d036c2..c8fb8d19ab33d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java @@ -30,6 +30,11 @@ @Internal public class RemoteExecutorFactory implements PipelineExecutorFactory { + @Override + public String getName() { + return RemoteExecutor.NAME; + } + @Override public boolean isCompatibleWith(final Configuration configuration) { return RemoteExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET)); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 51e71ecdcd9df..6f291e722e987 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -68,6 +68,7 @@ import java.net.URL; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; @@ -356,6 +357,12 @@ private static final class TestExecutorServiceLoader implements PipelineExecutor @Override public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) { return new PipelineExecutorFactory() { + + @Override + public String getName() { + return "my-name"; + } + @Override public boolean isCompatibleWith(@Nonnull Configuration configuration) { return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET)); @@ -377,5 +384,10 @@ public PipelineExecutor getExecutor(@Nonnull Configuration configuration) { } }; } + + @Override + public Stream getExecutorNames() { + throw new UnsupportedOperationException("not implemented"); + } } } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java index dfcf8b4208f8b..e146b088e959e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.ServiceLoader; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -82,6 +84,12 @@ public PipelineExecutorFactory getExecutorFactory(final Configuration configurat return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0); } + @Override + public Stream getExecutorNames() { + return StreamSupport.stream(defaultLoader.spliterator(), false) + .map(PipelineExecutorFactory::getName); + } + private DefaultExecutorServiceLoader() { // make sure nobody instantiates us explicitly. } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutorFactory.java b/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutorFactory.java index 0e575223bea88..09611a5aa6937 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutorFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutorFactory.java @@ -28,6 +28,11 @@ @Internal public interface PipelineExecutorFactory { + /** + * Returns the name of the executor that this factory creates. + */ + String getName(); + /** * Returns {@code true} if this factory is compatible with the options in the * provided configuration, {@code false} otherwise. diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutorServiceLoader.java index 79057db938737..a1fa461b5eede 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutorServiceLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutorServiceLoader.java @@ -21,6 +21,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; +import java.util.stream.Stream; + /** * An interface to be implemented by the entity responsible for finding the correct {@link PipelineExecutor} to * execute a given {@link org.apache.flink.api.dag.Pipeline}. @@ -38,4 +40,9 @@ public interface PipelineExecutorServiceLoader { * loading the registered factories. */ PipelineExecutorFactory getExecutorFactory(final Configuration configuration) throws Exception; + + /** + * Loads and returns a stream of the names of all available executors. + */ + Stream getExecutorNames(); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java index c28ca97e0a4ba..ab4a66b1e71b8 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java @@ -74,6 +74,11 @@ private JobExecutionResult executeTestJobBasedOnConfig(final Configuration confi */ public static class IDReportingExecutorFactory implements PipelineExecutorFactory { + @Override + public String getName() { + return EXEC_NAME; + } + @Override public boolean isCompatibleWith(Configuration configuration) { return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET)); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java index bd52c2e4fc1eb..8bdb304db6be0 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java @@ -32,6 +32,11 @@ @Internal public class KubernetesSessionClusterExecutorFactory implements PipelineExecutorFactory { + @Override + public String getName() { + return KubernetesSessionClusterExecutor.NAME; + } + @Override public boolean isCompatibleWith(@Nonnull final Configuration configuration) { return configuration.get(DeploymentOptions.TARGET) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java index 5c6a8c2415d2b..93e62485a479e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java @@ -47,6 +47,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.hamcrest.Matchers.is; @@ -133,6 +134,12 @@ public SavepointRestoreSettings getActualSavepointRestoreSettings() { @Override public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) { return new PipelineExecutorFactory() { + + @Override + public String getName() { + return "my-name"; + } + @Override public boolean isCompatibleWith(@Nonnull Configuration configuration) { return true; @@ -154,6 +161,11 @@ public PipelineExecutor getExecutor(@Nonnull Configuration configuration) { } }; } + + @Override + public Stream getExecutorNames() { + throw new UnsupportedOperationException("not implemented"); + } } private static final class TestClusterClient implements ClusterClient { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java index 4f285cf72ac11..bee2bdfe63dfa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java @@ -74,6 +74,11 @@ private JobExecutionResult executeTestJobBasedOnConfig(final Configuration confi */ public static class IDReportingExecutorFactory implements PipelineExecutorFactory { + @Override + public String getName() { + return EXEC_NAME; + } + @Override public boolean isCompatibleWith(Configuration configuration) { return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET)); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java index 7cec1294cfd85..48b8473cbad80 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java @@ -32,6 +32,11 @@ @Internal public class YarnJobClusterExecutorFactory implements PipelineExecutorFactory { + @Override + public String getName() { + return YarnJobClusterExecutor.NAME; + } + @Override public boolean isCompatibleWith(@Nonnull final Configuration configuration) { return YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET)); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java index a887be5e71478..e5bf2ee78c8fa 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java @@ -32,6 +32,11 @@ @Internal public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactory { + @Override + public String getName() { + return YarnSessionClusterExecutor.NAME; + } + @Override public boolean isCompatibleWith(@Nonnull final Configuration configuration) { return YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));