Skip to content

Commit

Permalink
[FLINK-14501] Wired ClusterClientFactories to production code
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Oct 31, 2019
1 parent d1aa63f commit e444874
Show file tree
Hide file tree
Showing 23 changed files with 406 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* a ZooKeeper namespace.
*
*/
public abstract class AbstractCustomCommandLine<T> implements CustomCommandLine<T> {
public abstract class AbstractCustomCommandLine implements CustomCommandLine {

protected final Option zookeeperNamespaceOption = new Option("z", "zookeeperNamespace", true,
"Namespace to create the Zookeeper sub-paths for high availability mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
Expand Down Expand Up @@ -64,7 +67,6 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.net.URL;
Expand All @@ -81,6 +83,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Implementation of a simple command line frontend for executing programs.
*/
Expand All @@ -104,25 +108,35 @@ public class CliFrontend {

private final Configuration configuration;

private final List<CustomCommandLine<?>> customCommandLines;
private final List<CustomCommandLine> customCommandLines;

private final Options customCommandLineOptions;

private final Duration clientTimeout;

private final int defaultParallelism;

private final ClusterClientServiceLoader clusterClientServiceLoader;

public CliFrontend(
Configuration configuration,
List<CustomCommandLine<?>> customCommandLines) {
this.configuration = Preconditions.checkNotNull(configuration);
this.customCommandLines = Preconditions.checkNotNull(customCommandLines);
List<CustomCommandLine> customCommandLines) {
this(configuration, new DefaultClusterClientServiceLoader(), customCommandLines);
}

public CliFrontend(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List<CustomCommandLine> customCommandLines) {
this.configuration = checkNotNull(configuration);
this.customCommandLines = checkNotNull(customCommandLines);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);

FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

this.customCommandLineOptions = new Options();

for (CustomCommandLine<?> customCommandLine : customCommandLines) {
for (CustomCommandLine customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
Expand Down Expand Up @@ -194,24 +208,27 @@ protected void run(String[] args) throws Exception {
throw new CliArgsException("Could not build the program from JAR file.", e);
}

final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
try {
runProgram(customCommandLine, executorConfig, runOptions, program);
runProgram(executorConfig, runOptions, program);
} finally {
program.deleteExtractedLibraries();
}
}

private <T> void runProgram(
CustomCommandLine<T> customCommandLine,
Configuration executorConfig,
RunOptions runOptions,
PackagedProgram program) throws ProgramInvocationException, FlinkException {
final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(executorConfig);

final ClusterClientFactory<T> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
checkNotNull(clusterClientFactory);

final ClusterDescriptor<T> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig);

try {
final T clusterId = customCommandLine.getClusterId(executorConfig);
final T clusterId = clusterClientFactory.getClusterId(executorConfig);

final ClusterClient<T> client;

Expand All @@ -221,7 +238,7 @@ private <T> void runProgram(

final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);

final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(executorConfig);
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
client = clusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
Expand All @@ -242,7 +259,7 @@ private <T> void runProgram(
} else {
// also in job mode we have to deploy a session cluster because the job
// might consist of multiple parts (e.g. when using collect)
final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(executorConfig);
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
client = clusterDescriptor.deploySessionCluster(clusterSpecification);
// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
// there's a race-condition here if cli is killed before shutdown hook is installed
Expand Down Expand Up @@ -395,7 +412,7 @@ protected void list(String[] args) throws Exception {
showAll = listOptions.showAll();
}

final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);

runClusterAction(
activeCommandLine,
Expand Down Expand Up @@ -513,7 +530,7 @@ protected void stop(String[] args) throws Exception {

logAndSysout((advanceToEndOfEventTime ? "Draining job " : "Suspending job ") + "\"" + jobId + "\" with a savepoint.");

final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
runClusterAction(
activeCommandLine,
commandLine,
Expand Down Expand Up @@ -550,7 +567,7 @@ protected void cancel(String[] args) throws Exception {
return;
}

final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);

final String[] cleanedArgs = cancelOptions.getArgs();

Expand Down Expand Up @@ -635,7 +652,7 @@ protected void savepoint(String[] args) throws Exception {
return;
}

final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);

if (savepointOptions.isDispose()) {
runClusterAction(
Expand Down Expand Up @@ -908,21 +925,22 @@ private JobID parseJobId(String jobIdString) throws CliArgsException {
* @param activeCommandLine to create the {@link ClusterDescriptor} from
* @param commandLine containing the parsed command line options
* @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}.
* @param <T> type of the cluster id
* @param <ClusterID> type of the cluster id
* @throws FlinkException if something goes wrong
*/
private <T> void runClusterAction(CustomCommandLine<T> activeCommandLine, CommandLine commandLine, ClusterAction<T> clusterAction) throws FlinkException {
private <ClusterID> void runClusterAction(CustomCommandLine activeCommandLine, CommandLine commandLine, ClusterAction<ClusterID> clusterAction) throws FlinkException {
final Configuration executorConfig = activeCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
final ClusterDescriptor<T> clusterDescriptor = activeCommandLine.createClusterDescriptor(executorConfig);

final T clusterId = activeCommandLine.getClusterId(executorConfig);
final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig);
final ClusterID clusterId = clusterClientFactory.getClusterId(executorConfig);

if (clusterId == null) {
throw new FlinkException("No cluster id was specified. Please specify a cluster to which " +
"you would like to connect.");
} else {
try {
final ClusterClient<T> clusterClient = clusterDescriptor.retrieve(clusterId);
final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterId);

try {
clusterAction.runAction(clusterClient);
Expand Down Expand Up @@ -1052,7 +1070,7 @@ public static void main(final String[] args) {
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

// 3. load the custom command lines
final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);

Expand Down Expand Up @@ -1117,8 +1135,8 @@ static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress
config.setInteger(RestOptions.PORT, address.getPort());
}

public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
List<CustomCommandLine> customCommandLines = new ArrayList<>(2);

// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
Expand Down Expand Up @@ -1150,8 +1168,8 @@ public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration co
* @param commandLine The input to the command-line.
* @return custom command-line which is active (may only be one at a time)
*/
public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine) {
for (CustomCommandLine<?> cli : customCommandLines) {
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
for (CustomCommandLine cli : customCommandLines) {
if (cli.isActive(commandLine)) {
return cli;
}
Expand All @@ -1164,7 +1182,7 @@ public CustomCommandLine<?> getActiveCustomCommandLine(CommandLine commandLine)
* @param className The fully-qualified class name to load.
* @param params The constructor parameters
*/
private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
private static CustomCommandLine loadCustomCommandLine(String className, Object... params) throws Exception {

Class<? extends CustomCommandLine> customCliClass =
Class.forName(className).asSubclass(CustomCommandLine.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private static Options getSavepointOptionsWithoutDeprecatedOptions(Options optio
/**
* Prints the help for the client.
*/
public static void printHelp(Collection<CustomCommandLine<?>> customCommandLines) {
public static void printHelp(Collection<CustomCommandLine> customCommandLines) {
System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
System.out.println();
System.out.println("The following actions are available:");
Expand All @@ -321,7 +321,7 @@ public static void printHelp(Collection<CustomCommandLine<?>> customCommandLines
System.out.println();
}

public static void printHelpForRun(Collection<CustomCommandLine<?>> customCommandLines) {
public static void printHelpForRun(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
Expand Down Expand Up @@ -349,7 +349,7 @@ public static void printHelpForInfo() {
System.out.println();
}

public static void printHelpForList(Collection<CustomCommandLine<?>> customCommandLines) {
public static void printHelpForList(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
Expand All @@ -364,7 +364,7 @@ public static void printHelpForList(Collection<CustomCommandLine<?>> customComma
System.out.println();
}

public static void printHelpForStop(Collection<CustomCommandLine<?>> customCommandLines) {
public static void printHelpForStop(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
Expand All @@ -379,7 +379,7 @@ public static void printHelpForStop(Collection<CustomCommandLine<?>> customComma
System.out.println();
}

public static void printHelpForCancel(Collection<CustomCommandLine<?>> customCommandLines) {
public static void printHelpForCancel(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
Expand All @@ -394,7 +394,7 @@ public static void printHelpForCancel(Collection<CustomCommandLine<?>> customCom
System.out.println();
}

public static void printHelpForSavepoint(Collection<CustomCommandLine<?>> customCommandLines) {
public static void printHelpForSavepoint(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
Expand All @@ -415,7 +415,7 @@ public static void printHelpForSavepoint(Collection<CustomCommandLine<?>> custom
* @param runOptions True if the run options should be printed, False to print only general options
*/
private static void printCustomCliOptions(
Collection<CustomCommandLine<?>> customCommandLines,
Collection<CustomCommandLine> customCommandLines,
HelpFormatter formatter,
boolean runOptions) {
// prints options from all available command-line classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,16 @@

package org.apache.flink.client.cli;

import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

import javax.annotation.Nullable;

/**
* Custom command-line interface to load hooks for the command-line interface.
*/
public interface CustomCommandLine<T> {
public interface CustomCommandLine {

/**
* Signals whether the custom command-line wants to execute or not.
Expand Down Expand Up @@ -67,34 +63,6 @@ public interface CustomCommandLine<T> {
*/
Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException;

/**
* Create a {@link ClusterDescriptor} from the given configuration.
*
* @param configuration containing the configuration options relevant for the {@link ClusterDescriptor}
* @return the corresponding {@link ClusterDescriptor}.
*/
ClusterDescriptor<T> createClusterDescriptor(Configuration configuration);

/**
* Returns the cluster id if a cluster id is specified in the provided configuration, otherwise it returns {@code null}.
*
* <p>A cluster id identifies a running cluster, e.g. the Yarn application id for a Flink cluster running on Yarn.
*
* @param configuration containing the configuration options relevant for the cluster id retrieval
* @return Cluster id identifying the cluster to deploy jobs to or null
*/
@Nullable
T getClusterId(Configuration configuration);

/**
* Returns the {@link ClusterSpecification} specified by the configuration and the command
* line options. This specification can be used to deploy a new Flink cluster.
*
* @param configuration containing the configuration options relevant for the {@link ClusterSpecification}
* @return the corresponding {@link ClusterSpecification} for a new Flink cluster
*/
ClusterSpecification getClusterSpecification(Configuration configuration);

default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions) throws CliArgsException {
final Options options = new Options();
addGeneralOptions(options);
Expand Down
Loading

0 comments on commit e444874

Please sign in to comment.