---
title: "Python REPL"
nav-parent_id: ops
nav-pos: 7
---
Flink comes with an integrated interactive Python Shell.
It can be used in a local setup as well as in a cluster setup.
To use the shell with an integrated Flink cluster just execute:
{% highlight bash %}
bin/pyflink-shell.sh local
{% endhighlight %}
in the root directory of your binary Flink directory. To run the Shell on a
cluster, please see the Setup section below.
## Usage
The shell only supports Table API currently.
The Table Environments are automatically prebound after startup.
Use "bt_env" and "st_env" to access BatchTableEnvironment and StreamTableEnvironment respectively.
### Table API
The example below is a simple program in the Python shell:
{% highlight python %}
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
>>> if os.path.exists(sink_path):
... if os.path.isfile(sink_path):
... os.remove(sink_path)
... else:
... shutil.rmtree(sink_path)
>>> st_config = TableConfig.Builder().set_parallelism(1).as_streaming_execution().build()
>>> st_env = TableEnvironment.create(st_config)
>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.connect(FileSystem().path(sink_path))\
... .with_format(OldCsv()
... .field_delimiter(',')
... .field("a", DataTypes.BIGINT())
... .field("b", DataTypes.STRING())
... .field("c", DataTypes.STRING()))\
... .with_schema(Schema()
... .field("a", DataTypes.BIGINT())
... .field("b", DataTypes.STRING())
... .field("c", DataTypes.STRING()))\
... .register_table_sink("stream_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("stream_sink")
>>> st_env.execute()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(sink_path, 'r') as f:
... print(f.read())
{% endhighlight %}
{% highlight python %}
>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/batch.csv'
>>> if os.path.exists(sink_path):
... if os.path.isfile(sink_path):
... os.remove(sink_path)
... else:
... shutil.rmtree(sink_path)
>>> bt_config = TableConfig.Builder().set_parallelism(1).as_batch_execution().build()
>>> bt_env = TableEnvironment.create(bt_config)
>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> bt_env.connect(FileSystem().path(sink_path))\
... .with_format(OldCsv()
... .field_delimiter(',')
... .field("a", DataTypes.BIGINT())
... .field("b", DataTypes.STRING())
... .field("c", DataTypes.STRING()))\
... .with_schema(Schema()
... .field("a", DataTypes.BIGINT())
... .field("b", DataTypes.STRING())
... .field("c", DataTypes.STRING()))\
... .register_table_sink("batch_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("batch_sink")
>>> bt_env.execute()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(sink_path, 'r') as f:
... print(f.read())
{% endhighlight %}
## Setup
To get an overview of what options the Python Shell provides, please use
{% highlight bash %}
bin/pyflink-shell.sh --help
{% endhighlight %}
### Local
To use the shell with an integrated Flink cluster just execute:
{% highlight bash %}
bin/pyflink-shell.sh local
{% endhighlight %}
### Remote
To use it with a running cluster, please start the Python shell with the keyword `remote`
and supply the host and port of the JobManager with:
{% highlight bash %}
bin/pyflink-shell.sh remote