From 722f8445b7f8f2df90af9e8c4b3cfa231d71129d Mon Sep 17 00:00:00 2001 From: Wei Zhong Date: Wed, 13 May 2020 09:22:29 +0800 Subject: [PATCH] [FLINK-17612][python][sql-client] Support Python command line options in SQL Client. (#12077) --- docs/dev/table/sqlClient.md | 79 +++++++++++++++++++ docs/dev/table/sqlClient.zh.md | 79 +++++++++++++++++++ .../apache/flink/table/client/SqlClient.java | 13 +++ .../flink/table/client/cli/CliOptions.java | 11 ++- .../table/client/cli/CliOptionsParser.java | 43 +++++++++- 5 files changed, 221 insertions(+), 4 deletions(-) diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index daa14191935ae..8331efef472bc 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -136,6 +136,10 @@ Mode "embedded" submits Flink jobs from the local machine. properties. -h,--help Show the help message with descriptions of all options. + -hist,--history The file which you want to save the + command history into. If not + specified, we will auto-generate one + under your user's home directory. -j,--jar A JAR file to be imported into the session. The file might contain user-defined classes needed for the @@ -149,8 +153,83 @@ Mode "embedded" submits Flink jobs from the local machine. statements such as functions, table sources, or sinks. Can be used multiple times. + -pyarch,--pyArchives Add python archive files for job. The + archive files will be extracted to + the working directory of python UDF + worker. Currently only zip-format is + supported. For each archive file, a + target directory be specified. If the + target directory name is specified, + the archive file will be extracted to + a name can directory with the + specified name. Otherwise, the + archive file will be extracted to a + directory with the same name of the + archive file. The files uploaded via + this option are accessible via + relative path. '#' could be used as + the separator of the archive file + path and the target directory name. + Comma (',') could be used as the + separator to specify multiple archive + files. This option can be used to + upload the virtual environment, the + data files used in Python UDF (e.g.: + --pyArchives + file:///tmp/py37.zip,file:///tmp/data + .zip#data --pyExecutable + py37.zip/py37/bin/python). The data + files could be accessed in Python + UDF, e.g.: f = open('data/data.txt', + 'r'). + -pyexec,--pyExecutable Specify the path of the python + interpreter used to execute the + python UDF worker (e.g.: + --pyExecutable + /usr/local/bin/python3). The python + UDF worker depends on Python 3.5+, + Apache Beam (version == 2.19.0), Pip + (version >= 7.1.0) and SetupTools + (version >= 37.0.0). Please ensure + that the specified environment meets + the above requirements. + -pyfs,--pyFiles Attach custom python files for job. + These files will be added to the + PYTHONPATH of both the local client + and the remote python UDF worker. The + standard python resource file + suffixes such as .py/.egg/.zip or + directory are all supported. Comma + (',') could be used as the separator + to specify multiple files (e.g.: + --pyFiles + file:///tmp/myresource.zip,hdfs:///$n + amenode_address/myresource2.zip). + -pyreq,--pyRequirements Specify a requirements.txt file which + defines the third-party dependencies. + These dependencies will be installed + and added to the PYTHONPATH of the + python UDF worker. A directory which + contains the installation packages of + these dependencies could be specified + optionally. Use '#' as the separator + if the optional parameter exists + (e.g.: --pyRequirements + file:///tmp/requirements.txt#file:/// + tmp/cached_dir). -s,--session The identifier for a session. 'default' is the default identifier. + -u,--update Experimental (for testing only!): + Instructs the SQL Client to + immediately execute the given update + statement after starting up. The + process is shut down after the + statement has been submitted to the + cluster and returns an appropriate + return code. Currently, this feature + is only supported for INSERT INTO + statements that declare the target + sink table. {% endhighlight %} {% top %} diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md index 44355c8955145..c234390df014f 100644 --- a/docs/dev/table/sqlClient.zh.md +++ b/docs/dev/table/sqlClient.zh.md @@ -137,6 +137,10 @@ Mode "embedded" submits Flink jobs from the local machine. properties. -h,--help Show the help message with descriptions of all options. + -hist,--history The file which you want to save the + command history into. If not + specified, we will auto-generate one + under your user's home directory. -j,--jar A JAR file to be imported into the session. The file might contain user-defined classes needed for the @@ -150,8 +154,83 @@ Mode "embedded" submits Flink jobs from the local machine. statements such as functions, table sources, or sinks. Can be used multiple times. + -pyarch,--pyArchives Add python archive files for job. The + archive files will be extracted to + the working directory of python UDF + worker. Currently only zip-format is + supported. For each archive file, a + target directory be specified. If the + target directory name is specified, + the archive file will be extracted to + a name can directory with the + specified name. Otherwise, the + archive file will be extracted to a + directory with the same name of the + archive file. The files uploaded via + this option are accessible via + relative path. '#' could be used as + the separator of the archive file + path and the target directory name. + Comma (',') could be used as the + separator to specify multiple archive + files. This option can be used to + upload the virtual environment, the + data files used in Python UDF (e.g.: + --pyArchives + file:///tmp/py37.zip,file:///tmp/data + .zip#data --pyExecutable + py37.zip/py37/bin/python). The data + files could be accessed in Python + UDF, e.g.: f = open('data/data.txt', + 'r'). + -pyexec,--pyExecutable Specify the path of the python + interpreter used to execute the + python UDF worker (e.g.: + --pyExecutable + /usr/local/bin/python3). The python + UDF worker depends on Python 3.5+, + Apache Beam (version == 2.19.0), Pip + (version >= 7.1.0) and SetupTools + (version >= 37.0.0). Please ensure + that the specified environment meets + the above requirements. + -pyfs,--pyFiles Attach custom python files for job. + These files will be added to the + PYTHONPATH of both the local client + and the remote python UDF worker. The + standard python resource file + suffixes such as .py/.egg/.zip or + directory are all supported. Comma + (',') could be used as the separator + to specify multiple files (e.g.: + --pyFiles + file:///tmp/myresource.zip,hdfs:///$n + amenode_address/myresource2.zip). + -pyreq,--pyRequirements Specify a requirements.txt file which + defines the third-party dependencies. + These dependencies will be installed + and added to the PYTHONPATH of the + python UDF worker. A directory which + contains the installation packages of + these dependencies could be specified + optionally. Use '#' as the separator + if the optional parameter exists + (e.g.: --pyRequirements + file:///tmp/requirements.txt#file:/// + tmp/cached_dir). -s,--session The identifier for a session. 'default' is the default identifier. + -u,--update Experimental (for testing only!): + Instructs the SQL Client to + immediately execute the given update + statement after starting up. The + process is shut down after the + statement has been submitted to the + cluster and returns an appropriate + return code. Currently, this feature + is only supported for INSERT INTO + statements that declare the target + sink table. {% endhighlight %} {% top %} diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java index 3f68dc622e225..627e70bf2a70e 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java @@ -18,6 +18,7 @@ package org.apache.flink.table.client; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.client.cli.CliClient; import org.apache.flink.table.client.cli.CliOptions; import org.apache.flink.table.client.cli.CliOptionsParser; @@ -36,7 +37,12 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.client.config.entries.ConfigurationEntry.create; +import static org.apache.flink.table.client.config.entries.ConfigurationEntry.merge; /** * SQL Client for submitting SQL statements. The client can be executed in two @@ -90,6 +96,7 @@ private void start() { // create CLI client with session environment final Environment sessionEnv = readSessionEnvironment(options.getEnvironment()); + appendPythonConfig(sessionEnv, options.getPythonConfiguration()); final SessionContext context; if (options.getSessionId() == null) { context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv); @@ -166,6 +173,12 @@ private static Environment readSessionEnvironment(URL envUrl) { } } + private static void appendPythonConfig(Environment env, Configuration pythonConfiguration) { + Map pythonConfig = new HashMap<>(pythonConfiguration.toMap()); + Map combinedConfig = new HashMap<>(merge(env.getConfiguration(), create(pythonConfig)).asMap()); + env.setConfiguration(combinedConfig); + } + // -------------------------------------------------------------------------------------------- public static void main(String[] args) { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java index 4eaed629abfe9..c211f79627b00 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java @@ -18,6 +18,8 @@ package org.apache.flink.table.client.cli; +import org.apache.flink.configuration.Configuration; + import java.net.URL; import java.util.List; @@ -35,6 +37,7 @@ public class CliOptions { private final List libraryDirs; private final String updateStatement; private final String historyFilePath; + private final Configuration pythonConfiguration; public CliOptions( boolean isPrintHelp, @@ -44,7 +47,8 @@ public CliOptions( List jars, List libraryDirs, String updateStatement, - String historyFilePath) { + String historyFilePath, + Configuration pythonConfiguration) { this.isPrintHelp = isPrintHelp; this.sessionId = sessionId; this.environment = environment; @@ -53,6 +57,7 @@ public CliOptions( this.libraryDirs = libraryDirs; this.updateStatement = updateStatement; this.historyFilePath = historyFilePath; + this.pythonConfiguration = pythonConfiguration; } public boolean isPrintHelp() { @@ -86,4 +91,8 @@ public String getUpdateStatement() { public String getHistoryFilePath() { return historyFilePath; } + + public Configuration getPythonConfiguration() { + return pythonConfiguration; + } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java index a7626bbb75dc4..2c6b7efd89f72 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java @@ -18,6 +18,7 @@ package org.apache.flink.table.client.cli; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.table.client.SqlClientException; @@ -29,11 +30,18 @@ import org.apache.commons.cli.ParseException; import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION; + /** * Parser for command line options. */ @@ -145,6 +153,10 @@ public static Options getEmbeddedModeClientOptions(Options options) { options.addOption(OPTION_LIBRARY); options.addOption(OPTION_UPDATE); options.addOption(OPTION_HISTORY); + options.addOption(PYFILES_OPTION); + options.addOption(PYREQUIREMENTS_OPTION); + options.addOption(PYARCHIVE_OPTION); + options.addOption(PYEXEC_OPTION); return options; } @@ -154,6 +166,10 @@ public static Options getGatewayModeClientOptions(Options options) { options.addOption(OPTION_ENVIRONMENT); options.addOption(OPTION_UPDATE); options.addOption(OPTION_HISTORY); + options.addOption(PYFILES_OPTION); + options.addOption(PYREQUIREMENTS_OPTION); + options.addOption(PYARCHIVE_OPTION); + options.addOption(PYEXEC_OPTION); return options; } @@ -162,6 +178,10 @@ public static Options getGatewayModeGatewayOptions(Options options) { options.addOption(OPTION_DEFAULTS); options.addOption(OPTION_JAR); options.addOption(OPTION_LIBRARY); + options.addOption(PYFILES_OPTION); + options.addOption(PYREQUIREMENTS_OPTION); + options.addOption(PYARCHIVE_OPTION); + options.addOption(PYEXEC_OPTION); return options; } @@ -249,7 +269,8 @@ public static CliOptions parseEmbeddedModeClient(String[] args) { checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()), - line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()) + line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()), + getPythonConfiguration(line) ); } catch (ParseException e) { @@ -269,7 +290,8 @@ public static CliOptions parseGatewayModeClient(String[] args) { checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()), - line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()) + line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()), + getPythonConfiguration(line) ); } catch (ParseException e) { @@ -289,7 +311,8 @@ public static CliOptions parseGatewayModeGateway(String[] args) { checkUrls(line, CliOptionsParser.OPTION_JAR), checkUrls(line, CliOptionsParser.OPTION_LIBRARY), null, - null + null, + getPythonConfiguration(line) ); } catch (ParseException e) { @@ -331,4 +354,18 @@ private static String checkSessionId(CommandLine line) { } return sessionId; } + + private static Configuration getPythonConfiguration(CommandLine line) { + try { + Class clazz = Class.forName( + "org.apache.flink.python.util.PythonDependencyUtils", + true, + Thread.currentThread().getContextClassLoader()); + Method parsePythonDependencyConfiguration = + clazz.getMethod("parsePythonDependencyConfiguration", CommandLine.class); + return (Configuration) parsePythonDependencyConfiguration.invoke(null, line); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new SqlClientException("Failed to parse the Python command line options.", e); + } + } }