Skip to content

Commit

Permalink
[FLINK-15276] Add method for listing executors to PipelineExecutorSer…
Browse files Browse the repository at this point in the history
…viceLoader

The first use case for this is listing the available executors in the CLI.
  • Loading branch information
aljoscha committed Jan 23, 2020
1 parent aa43ff6 commit d9532e3
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
@Internal
public class LocalExecutorFactory implements PipelineExecutorFactory {

@Override
public String getName() {
return LocalExecutor.NAME;
}

@Override
public boolean isCompatibleWith(final Configuration configuration) {
return LocalExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
@Internal
public class RemoteExecutorFactory implements PipelineExecutorFactory {

@Override
public String getName() {
return RemoteExecutor.NAME;
}

@Override
public boolean isCompatibleWith(final Configuration configuration) {
return RemoteExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.net.URL;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -356,6 +357,12 @@ private static final class TestExecutorServiceLoader implements PipelineExecutor
@Override
public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
return new PipelineExecutorFactory() {

@Override
public String getName() {
return "my-name";
}

@Override
public boolean isCompatibleWith(@Nonnull Configuration configuration) {
return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
Expand All @@ -377,5 +384,10 @@ public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
}
};
}

@Override
public Stream<String> getExecutorNames() {
throw new UnsupportedOperationException("not implemented");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -82,6 +84,12 @@ public PipelineExecutorFactory getExecutorFactory(final Configuration configurat
return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
}

@Override
public Stream<String> getExecutorNames() {
return StreamSupport.stream(defaultLoader.spliterator(), false)
.map(PipelineExecutorFactory::getName);
}

private DefaultExecutorServiceLoader() {
// make sure nobody instantiates us explicitly.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
@Internal
public interface PipelineExecutorFactory {

/**
* Returns the name of the executor that this factory creates.
*/
String getName();

/**
* Returns {@code true} if this factory is compatible with the options in the
* provided configuration, {@code false} otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;

import java.util.stream.Stream;

/**
* An interface to be implemented by the entity responsible for finding the correct {@link PipelineExecutor} to
* execute a given {@link org.apache.flink.api.dag.Pipeline}.
Expand All @@ -38,4 +40,9 @@ public interface PipelineExecutorServiceLoader {
* loading the registered factories.
*/
PipelineExecutorFactory getExecutorFactory(final Configuration configuration) throws Exception;

/**
* Loads and returns a stream of the names of all available executors.
*/
Stream<String> getExecutorNames();
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ private JobExecutionResult executeTestJobBasedOnConfig(final Configuration confi
*/
public static class IDReportingExecutorFactory implements PipelineExecutorFactory {

@Override
public String getName() {
return EXEC_NAME;
}

@Override
public boolean isCompatibleWith(Configuration configuration) {
return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
@Internal
public class KubernetesSessionClusterExecutorFactory implements PipelineExecutorFactory {

@Override
public String getName() {
return KubernetesSessionClusterExecutor.NAME;
}

@Override
public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
return configuration.get(DeploymentOptions.TARGET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -133,6 +134,12 @@ public SavepointRestoreSettings getActualSavepointRestoreSettings() {
@Override
public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
return new PipelineExecutorFactory() {

@Override
public String getName() {
return "my-name";
}

@Override
public boolean isCompatibleWith(@Nonnull Configuration configuration) {
return true;
Expand All @@ -154,6 +161,11 @@ public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
}
};
}

@Override
public Stream<String> getExecutorNames() {
throw new UnsupportedOperationException("not implemented");
}
}

private static final class TestClusterClient implements ClusterClient<Object> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ private JobExecutionResult executeTestJobBasedOnConfig(final Configuration confi
*/
public static class IDReportingExecutorFactory implements PipelineExecutorFactory {

@Override
public String getName() {
return EXEC_NAME;
}

@Override
public boolean isCompatibleWith(Configuration configuration) {
return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
@Internal
public class YarnJobClusterExecutorFactory implements PipelineExecutorFactory {

@Override
public String getName() {
return YarnJobClusterExecutor.NAME;
}

@Override
public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
return YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
@Internal
public class YarnSessionClusterExecutorFactory implements PipelineExecutorFactory {

@Override
public String getName() {
return YarnSessionClusterExecutor.NAME;
}

@Override
public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
return YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
Expand Down

0 comments on commit d9532e3

Please sign in to comment.