--- title: "Python REPL" nav-parent_id: ops nav-pos: 7 --- Flink comes with an integrated interactive Python Shell. It can be used in a local setup as well as in a cluster setup. To use the shell with an integrated Flink cluster just execute: {% highlight bash %} bin/pyflink-shell.sh local {% endhighlight %} in the root directory of your binary Flink directory. To run the Shell on a cluster, please see the Setup section below. ## Usage The shell only supports Table API currently. The Table Environments are automatically prebound after startup. Use "bt_env" and "st_env" to access BatchTableEnvironment and StreamTableEnvironment respectively. ### Table API The example below is a simple program in the Python shell:
{% highlight python %} >>> import tempfile >>> import os >>> import shutil >>> sink_path = tempfile.gettempdir() + '/streaming.csv' >>> if os.path.exists(sink_path): ... if os.path.isfile(sink_path): ... os.remove(sink_path) ... else: ... shutil.rmtree(sink_path) >>> st_config = TableConfig.Builder().set_parallelism(1).as_streaming_execution().build() >>> st_env = TableEnvironment.create(st_config) >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c']) >>> st_env.connect(FileSystem().path(sink_path))\ ... .with_format(OldCsv() ... .field_delimiter(',') ... .field("a", DataTypes.BIGINT()) ... .field("b", DataTypes.STRING()) ... .field("c", DataTypes.STRING()))\ ... .with_schema(Schema() ... .field("a", DataTypes.BIGINT()) ... .field("b", DataTypes.STRING()) ... .field("c", DataTypes.STRING()))\ ... .register_table_sink("stream_sink") >>> t.select("a + 1, b, c")\ ... .insert_into("stream_sink") >>> st_env.execute() >>> # If the job runs in local mode, you can exec following code in Python shell to see the result: >>> with open(sink_path, 'r') as f: ... print(f.read()) {% endhighlight %}
{% highlight python %} >>> import tempfile >>> import os >>> import shutil >>> sink_path = tempfile.gettempdir() + '/batch.csv' >>> if os.path.exists(sink_path): ... if os.path.isfile(sink_path): ... os.remove(sink_path) ... else: ... shutil.rmtree(sink_path) >>> bt_config = TableConfig.Builder().set_parallelism(1).as_batch_execution().build() >>> bt_env = TableEnvironment.create(bt_config) >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c']) >>> bt_env.connect(FileSystem().path(sink_path))\ ... .with_format(OldCsv() ... .field_delimiter(',') ... .field("a", DataTypes.BIGINT()) ... .field("b", DataTypes.STRING()) ... .field("c", DataTypes.STRING()))\ ... .with_schema(Schema() ... .field("a", DataTypes.BIGINT()) ... .field("b", DataTypes.STRING()) ... .field("c", DataTypes.STRING()))\ ... .register_table_sink("batch_sink") >>> t.select("a + 1, b, c")\ ... .insert_into("batch_sink") >>> bt_env.execute() >>> # If the job runs in local mode, you can exec following code in Python shell to see the result: >>> with open(sink_path, 'r') as f: ... print(f.read()) {% endhighlight %}
## Setup To get an overview of what options the Python Shell provides, please use {% highlight bash %} bin/pyflink-shell.sh --help {% endhighlight %} ### Local To use the shell with an integrated Flink cluster just execute: {% highlight bash %} bin/pyflink-shell.sh local {% endhighlight %} ### Remote To use it with a running cluster, please start the Python shell with the keyword `remote` and supply the host and port of the JobManager with: {% highlight bash %} bin/pyflink-shell.sh remote {% endhighlight %} ### Yarn Python Shell cluster The shell can deploy a Flink cluster to YARN, which is used exclusively by the shell. The number of YARN containers can be controlled by the parameter `-n `. The shell deploys a new Flink cluster on YARN and connects the cluster. You can also specify options for YARN cluster such as memory for JobManager, name of YARN application, etc. For example, to start a Yarn cluster for the Python Shell with two TaskManagers use the following: {% highlight bash %} bin/pyflink-shell.sh yarn -n 2 {% endhighlight %} For all other options, see the full reference at the bottom. ### Yarn Session If you have previously deployed a Flink cluster using the Flink Yarn Session, the Python shell can connect with it using the following command: {% highlight bash %} bin/pyflink-shell.sh yarn {% endhighlight %} ## Full Reference {% highlight bash %} Flink Python Shell Usage: pyflink-shell.sh [local|remote|yarn] [options] ... Command: local [options] Starts Flink Python shell with a local Flink cluster usage: -h,--help Show the help message with descriptions of all options. Command: remote [options] Starts Flink Python shell connecting to a remote cluster Remote host name as string Remote port as integer usage: -h,--help Show the help message with descriptions of all options. Command: yarn [options] Starts Flink Python shell connecting to a yarn cluster usage: -h,--help Show the help message with descriptions of all options. -jm,--jobManagerMemory Memory for JobManager Container with optional unit (default: MB) -n,--container Number of YARN container to allocate (=Number of Task Managers) -nm,--name Set a custom name for the application on YARN -qu,--queue Specify YARN queue. -s,--slots Number of slots per TaskManager -tm,--taskManagerMemory Memory per TaskManager Container with optional unit (default: MB) -h | --help Prints this usage text {% endhighlight %} {% top %}