Skip to content

Commit

Permalink
[FLINK-12716][python] Add an interactive shell for Python Table API
Browse files Browse the repository at this point in the history
This closes apache#8675
  • Loading branch information
HuangXingBo authored and sunjincheng121 committed Jun 14, 2019
1 parent 9e60763 commit 76ec612
Show file tree
Hide file tree
Showing 14 changed files with 1,066 additions and 15 deletions.
207 changes: 207 additions & 0 deletions docs/ops/python_shell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
---
title: "Python REPL"
nav-parent_id: ops
nav-pos: 7
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http:https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

Flink comes with an integrated interactive Python Shell.
It can be used in a local setup as well as in a cluster setup.

To use the shell with an integrated Flink cluster just execute:

{% highlight bash %}
bin/pyflink-shell.sh local
{% endhighlight %}

in the root directory of your binary Flink directory. To run the Shell on a
cluster, please see the Setup section below.

## Usage

The shell only supports Table API currently.
The Table Environments are automatically prebound after startup.
Use "bt_env" and "st_env" to access BatchTableEnvironment and StreamTableEnvironment respectively.

### Table API

The example below is a simple program in the Python shell:
<div class="codetabs" markdown="1">
<div data-lang="stream" markdown="1">
{% highlight python %}
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
>>> if os.path.exists(sink_path):
... if os.path.isfile(sink_path):
... os.remove(sink_path)
... else:
... shutil.rmtree(sink_path)
>>> st_config = TableConfig.Builder().set_parallelism(1).as_streaming_execution().build()
>>> st_env = TableEnvironment.create(st_config)
>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.connect(FileSystem().path(sink_path))\
... .with_format(OldCsv()
... .field_delimiter(',')
... .field("a", DataTypes.BIGINT())
... .field("b", DataTypes.STRING())
... .field("c", DataTypes.STRING()))\
... .with_schema(Schema()
... .field("a", DataTypes.BIGINT())
... .field("b", DataTypes.STRING())
... .field("c", DataTypes.STRING()))\
... .register_table_sink("stream_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("stream_sink")
>>> st_env.execute()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(sink_path, 'r') as f:
... print(f.read())
{% endhighlight %}
</div>
<div data-lang="batch" markdown="1">
{% highlight python %}
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/batch.csv'
>>> if os.path.exists(sink_path):
... if os.path.isfile(sink_path):
... os.remove(sink_path)
... else:
... shutil.rmtree(sink_path)
>>> bt_config = TableConfig.Builder().set_parallelism(1).as_batch_execution().build()
>>> bt_env = TableEnvironment.create(bt_config)
>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> bt_env.connect(FileSystem().path(sink_path))\
... .with_format(OldCsv()
... .field_delimiter(',')
... .field("a", DataTypes.BIGINT())
... .field("b", DataTypes.STRING())
... .field("c", DataTypes.STRING()))\
... .with_schema(Schema()
... .field("a", DataTypes.BIGINT())
... .field("b", DataTypes.STRING())
... .field("c", DataTypes.STRING()))\
... .register_table_sink("batch_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("batch_sink")
>>> bt_env.execute()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(sink_path, 'r') as f:
... print(f.read())
{% endhighlight %}
</div>
</div>

## Setup

To get an overview of what options the Python Shell provides, please use

{% highlight bash %}
bin/pyflink-shell.sh --help
{% endhighlight %}

### Local

To use the shell with an integrated Flink cluster just execute:

{% highlight bash %}
bin/pyflink-shell.sh local
{% endhighlight %}


### Remote

To use it with a running cluster, please start the Python shell with the keyword `remote`
and supply the host and port of the JobManager with:

{% highlight bash %}
bin/pyflink-shell.sh remote <hostname> <portnumber>
{% endhighlight %}

### Yarn Python Shell cluster

The shell can deploy a Flink cluster to YARN, which is used exclusively by the
shell. The number of YARN containers can be controlled by the parameter `-n <arg>`.
The shell deploys a new Flink cluster on YARN and connects the
cluster. You can also specify options for YARN cluster such as memory for
JobManager, name of YARN application, etc.

For example, to start a Yarn cluster for the Python Shell with two TaskManagers
use the following:

{% highlight bash %}
bin/pyflink-shell.sh yarn -n 2
{% endhighlight %}

For all other options, see the full reference at the bottom.


### Yarn Session

If you have previously deployed a Flink cluster using the Flink Yarn Session,
the Python shell can connect with it using the following command:

{% highlight bash %}
bin/pyflink-shell.sh yarn
{% endhighlight %}


## Full Reference

{% highlight bash %}
Flink Python Shell
Usage: pyflink-shell.sh [local|remote|yarn] [options] <args>...

Command: local [options]
Starts Flink Python shell with a local Flink cluster
usage:
-h,--help Show the help message with descriptions of all options.
Command: remote [options] <host> <port>
Starts Flink Python shell connecting to a remote cluster
<host>
Remote host name as string
<port>
Remote port as integer

usage:
-h,--help Show the help message with descriptions of all options.
Command: yarn [options]
Starts Flink Python shell connecting to a yarn cluster
usage:
-h,--help Show the help message with descriptions of
all options.
-jm,--jobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-n,--container <arg> Number of YARN container to allocate
(=Number of Task Managers)
-nm,--name <arg> Set a custom name for the application on
YARN
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-h | --help
Prints this usage text
{% endhighlight %}

{% top %}
Loading

0 comments on commit 76ec612

Please sign in to comment.