Skip to content

Commit

Permalink
[FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell.
Browse files Browse the repository at this point in the history
This closes apache#7437
  • Loading branch information
仲炜 authored and sunjincheng121 committed Jan 10, 2019
1 parent 120c4fc commit 6826c67
Showing 1 changed file with 111 additions and 3 deletions.
114 changes: 111 additions & 3 deletions docs/ops/scala_shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ cluster, please see the Setup section below.

## Usage

The shell supports Batch and Streaming.
Two different ExecutionEnvironments are automatically prebound after startup.
Use "benv" and "senv" to access the Batch and Streaming environment respectively.
The shell supports DataSet, DataStream, Table API and SQL.
Four different Environments are automatically prebound after startup.
Use "benv" and "senv" to access the Batch and Streaming ExecutionEnvironment respectively.
Use "btenv" and "stenv" to access BatchTableEnvironment and StreamTableEnvironment respectively.

### DataSet API

Expand Down Expand Up @@ -85,6 +86,113 @@ Note, that in the Streaming case, the print operation does not trigger execution

The Flink Shell comes with command history and auto-completion.

### Table API

The example below is a wordcount program using Table API:
<div class="codetabs" markdown="1">
<div data-lang="stream" markdown="1">
{% highlight scala %}
Scala-Flink> import org.apache.flink.table.functions.TableFunction
Scala-Flink> val textSource = stenv.fromDataStream(
senv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"),
'text)
Scala-Flink> class $Split extends TableFunction[String] {
def eval(s: String): Unit = {
s.toLowerCase.split("\\W+").foreach(collect)
}
}
Scala-Flink> val split = new $Split
Scala-Flink> textSource.join(split('text) as 'word).
groupBy('word).select('word, 'word.count as 'count).
toRetractStream[(String, Long)].print
Scala-Flink> senv.execute("Table Wordcount")
{% endhighlight %}
</div>
<div data-lang="batch" markdown="1">
{% highlight scala %}
Scala-Flink> import org.apache.flink.table.functions.TableFunction
Scala-Flink> val textSource = btenv.fromDataSet(
benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"),
'text)
Scala-Flink> class $Split extends TableFunction[String] {
def eval(s: String): Unit = {
s.toLowerCase.split("\\W+").foreach(collect)
}
}
Scala-Flink> val split = new $Split
Scala-Flink> textSource.join(split('text) as 'word).
groupBy('word).select('word, 'word.count as 'count).
toDataSet[(String, Long)].print
{% endhighlight %}
</div>
</div>

Note, that using $ as a prefix for the class name of TableFunction is a workaround of the issue that scala incorrectly generated inner class name.

### SQL

The following example is a wordcount program written in SQL:
<div class="codetabs" markdown="1">
<div data-lang="stream" markdown="1">
{% highlight scala %}
Scala-Flink> import org.apache.flink.table.functions.TableFunction
Scala-Flink> val textSource = stenv.fromDataStream(
senv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"),
'text)
Scala-Flink> stenv.registerTable("text_source", textSource)
Scala-Flink> class $Split extends TableFunction[String] {
def eval(s: String): Unit = {
s.toLowerCase.split("\\W+").foreach(collect)
}
}
Scala-Flink> stenv.registerFunction("split", new $Split)
Scala-Flink> val result = stenv.sqlQuery("""SELECT T.word, count(T.word) AS `count`
FROM text_source
JOIN LATERAL table(split(text)) AS T(word)
ON TRUE
GROUP BY T.word""")
Scala-Flink> result.toRetractStream[(String, Long)].print
Scala-Flink> senv.execute("SQL Wordcount")
{% endhighlight %}
</div>
<div data-lang="batch" markdown="1">
{% highlight scala %}
Scala-Flink> import org.apache.flink.table.functions.TableFunction
Scala-Flink> val textSource = btenv.fromDataSet(
benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"),
'text)
Scala-Flink> btenv.registerTable("text_source", textSource)
Scala-Flink> class $Split extends TableFunction[String] {
def eval(s: String): Unit = {
s.toLowerCase.split("\\W+").foreach(collect)
}
}
Scala-Flink> btenv.registerFunction("split", new $Split)
Scala-Flink> val result = btenv.sqlQuery("""SELECT T.word, count(T.word) AS `count`
FROM text_source
JOIN LATERAL table(split(text)) AS T(word)
ON TRUE
GROUP BY T.word""")
Scala-Flink> result.toDataSet[(String, Long)].print
{% endhighlight %}
</div>
</div>

## Adding external dependencies

Expand Down

0 comments on commit 6826c67

Please sign in to comment.