Skip to content

Commit

Permalink
[FLINK-17612][python][sql-client] Support Python command line options…
Browse files Browse the repository at this point in the history
… in SQL Client. (apache#12077)
  • Loading branch information
WeiZhong94 committed May 13, 2020
1 parent aa263f8 commit 722f844
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 4 deletions.
79 changes: 79 additions & 0 deletions docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ Mode "embedded" submits Flink jobs from the local machine.
properties.
-h,--help Show the help message with
descriptions of all options.
-hist,--history <History file path> The file which you want to save the
command history into. If not
specified, we will auto-generate one
under your user's home directory.
-j,--jar <JAR file> A JAR file to be imported into the
session. The file might contain
user-defined classes needed for the
Expand All @@ -149,8 +153,83 @@ Mode "embedded" submits Flink jobs from the local machine.
statements such as functions, table
sources, or sinks. Can be used
multiple times.
-pyarch,--pyArchives <arg> Add python archive files for job. The
archive files will be extracted to
the working directory of python UDF
worker. Currently only zip-format is
supported. For each archive file, a
target directory be specified. If the
target directory name is specified,
the archive file will be extracted to
a name can directory with the
specified name. Otherwise, the
archive file will be extracted to a
directory with the same name of the
archive file. The files uploaded via
this option are accessible via
relative path. '#' could be used as
the separator of the archive file
path and the target directory name.
Comma (',') could be used as the
separator to specify multiple archive
files. This option can be used to
upload the virtual environment, the
data files used in Python UDF (e.g.:
--pyArchives
file:https:///tmp/py37.zip,file:https:///tmp/data
.zip#data --pyExecutable
py37.zip/py37/bin/python). The data
files could be accessed in Python
UDF, e.g.: f = open('data/data.txt',
'r').
-pyexec,--pyExecutable <arg> Specify the path of the python
interpreter used to execute the
python UDF worker (e.g.:
--pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
Apache Beam (version == 2.19.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
the above requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
These files will be added to the
PYTHONPATH of both the local client
and the remote python UDF worker. The
standard python resource file
suffixes such as .py/.egg/.zip or
directory are all supported. Comma
(',') could be used as the separator
to specify multiple files (e.g.:
--pyFiles
file:https:///tmp/myresource.zip,hdfs:https:///$n
amenode_address/myresource2.zip).
-pyreq,--pyRequirements <arg> Specify a requirements.txt file which
defines the third-party dependencies.
These dependencies will be installed
and added to the PYTHONPATH of the
python UDF worker. A directory which
contains the installation packages of
these dependencies could be specified
optionally. Use '#' as the separator
if the optional parameter exists
(e.g.: --pyRequirements
file:https:///tmp/requirements.txt#file:https:///
tmp/cached_dir).
-s,--session <session identifier> The identifier for a session.
'default' is the default identifier.
-u,--update <SQL update statement> Experimental (for testing only!):
Instructs the SQL Client to
immediately execute the given update
statement after starting up. The
process is shut down after the
statement has been submitted to the
cluster and returns an appropriate
return code. Currently, this feature
is only supported for INSERT INTO
statements that declare the target
sink table.
{% endhighlight %}

{% top %}
Expand Down
79 changes: 79 additions & 0 deletions docs/dev/table/sqlClient.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ Mode "embedded" submits Flink jobs from the local machine.
properties.
-h,--help Show the help message with
descriptions of all options.
-hist,--history <History file path> The file which you want to save the
command history into. If not
specified, we will auto-generate one
under your user's home directory.
-j,--jar <JAR file> A JAR file to be imported into the
session. The file might contain
user-defined classes needed for the
Expand All @@ -150,8 +154,83 @@ Mode "embedded" submits Flink jobs from the local machine.
statements such as functions, table
sources, or sinks. Can be used
multiple times.
-pyarch,--pyArchives <arg> Add python archive files for job. The
archive files will be extracted to
the working directory of python UDF
worker. Currently only zip-format is
supported. For each archive file, a
target directory be specified. If the
target directory name is specified,
the archive file will be extracted to
a name can directory with the
specified name. Otherwise, the
archive file will be extracted to a
directory with the same name of the
archive file. The files uploaded via
this option are accessible via
relative path. '#' could be used as
the separator of the archive file
path and the target directory name.
Comma (',') could be used as the
separator to specify multiple archive
files. This option can be used to
upload the virtual environment, the
data files used in Python UDF (e.g.:
--pyArchives
file:https:///tmp/py37.zip,file:https:///tmp/data
.zip#data --pyExecutable
py37.zip/py37/bin/python). The data
files could be accessed in Python
UDF, e.g.: f = open('data/data.txt',
'r').
-pyexec,--pyExecutable <arg> Specify the path of the python
interpreter used to execute the
python UDF worker (e.g.:
--pyExecutable
/usr/local/bin/python3). The python
UDF worker depends on Python 3.5+,
Apache Beam (version == 2.19.0), Pip
(version >= 7.1.0) and SetupTools
(version >= 37.0.0). Please ensure
that the specified environment meets
the above requirements.
-pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
These files will be added to the
PYTHONPATH of both the local client
and the remote python UDF worker. The
standard python resource file
suffixes such as .py/.egg/.zip or
directory are all supported. Comma
(',') could be used as the separator
to specify multiple files (e.g.:
--pyFiles
file:https:///tmp/myresource.zip,hdfs:https:///$n
amenode_address/myresource2.zip).
-pyreq,--pyRequirements <arg> Specify a requirements.txt file which
defines the third-party dependencies.
These dependencies will be installed
and added to the PYTHONPATH of the
python UDF worker. A directory which
contains the installation packages of
these dependencies could be specified
optionally. Use '#' as the separator
if the optional parameter exists
(e.g.: --pyRequirements
file:https:///tmp/requirements.txt#file:https:///
tmp/cached_dir).
-s,--session <session identifier> The identifier for a session.
'default' is the default identifier.
-u,--update <SQL update statement> Experimental (for testing only!):
Instructs the SQL Client to
immediately execute the given update
statement after starting up. The
process is shut down after the
statement has been submitted to the
cluster and returns an appropriate
return code. Currently, this feature
is only supported for INSERT INTO
statements that declare the target
sink table.
{% endhighlight %}

{% top %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.client;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.client.cli.CliClient;
import org.apache.flink.table.client.cli.CliOptions;
import org.apache.flink.table.client.cli.CliOptionsParser;
Expand All @@ -36,7 +37,12 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.client.config.entries.ConfigurationEntry.create;
import static org.apache.flink.table.client.config.entries.ConfigurationEntry.merge;

/**
* SQL Client for submitting SQL statements. The client can be executed in two
Expand Down Expand Up @@ -90,6 +96,7 @@ private void start() {

// create CLI client with session environment
final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
appendPythonConfig(sessionEnv, options.getPythonConfiguration());
final SessionContext context;
if (options.getSessionId() == null) {
context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
Expand Down Expand Up @@ -166,6 +173,12 @@ private static Environment readSessionEnvironment(URL envUrl) {
}
}

private static void appendPythonConfig(Environment env, Configuration pythonConfiguration) {
Map<String, Object> pythonConfig = new HashMap<>(pythonConfiguration.toMap());
Map<String, Object> combinedConfig = new HashMap<>(merge(env.getConfiguration(), create(pythonConfig)).asMap());
env.setConfiguration(combinedConfig);
}

// --------------------------------------------------------------------------------------------

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.table.client.cli;

import org.apache.flink.configuration.Configuration;

import java.net.URL;
import java.util.List;

Expand All @@ -35,6 +37,7 @@ public class CliOptions {
private final List<URL> libraryDirs;
private final String updateStatement;
private final String historyFilePath;
private final Configuration pythonConfiguration;

public CliOptions(
boolean isPrintHelp,
Expand All @@ -44,7 +47,8 @@ public CliOptions(
List<URL> jars,
List<URL> libraryDirs,
String updateStatement,
String historyFilePath) {
String historyFilePath,
Configuration pythonConfiguration) {
this.isPrintHelp = isPrintHelp;
this.sessionId = sessionId;
this.environment = environment;
Expand All @@ -53,6 +57,7 @@ public CliOptions(
this.libraryDirs = libraryDirs;
this.updateStatement = updateStatement;
this.historyFilePath = historyFilePath;
this.pythonConfiguration = pythonConfiguration;
}

public boolean isPrintHelp() {
Expand Down Expand Up @@ -86,4 +91,8 @@ public String getUpdateStatement() {
public String getHistoryFilePath() {
return historyFilePath;
}

public Configuration getPythonConfiguration() {
return pythonConfiguration;
}
}
Loading

0 comments on commit 722f844

Please sign in to comment.