Skip to content

Commit

Permalink
[FLINK-3640] [docs] extend the Table API docs and add a section about…
Browse files Browse the repository at this point in the history
… embedded SQL mode

This closes apache#1867
  • Loading branch information
vasia committed Apr 11, 2016
1 parent 9e05439 commit ef7f9ac
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/apis/batch/libs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ under the License.

- Graph processing: [Gelly](gelly_guide.html)
- Machine Learning: [FlinkML](ml/index.html)
- Relational Queries: [Table](table.html)
- Relational Queries: [Table and SQL](table.html)
174 changes: 163 additions & 11 deletions docs/apis/batch/libs/table.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
---
title: "Table API - Relational Queries"
title: "Table API and SQL"
is_beta: true
# Top navigation
top-nav-group: libs
top-nav-pos: 3
top-nav-title: "Relational: Table"
top-nav-title: "Table API and SQL"
# Sub navigation
sub-nav-group: batch
sub-nav-parent: libs
sub-nav-pos: 3
sub-nav-title: Table
sub-nav-title: Table API and SQL
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -30,13 +30,24 @@ specific language governing permissions and limitations
under the License.
-->


**The Table API: an experimental feature**

Flink provides an API that allows specifying operations using SQL-like expressions. Instead of
manipulating a `DataSet` you can work with a `Table` on which relational operations can
be performed.
Flink's Table API is a SQL-like expression language embedded in Java and Scala.
Instead of manipulating a `DataSet` or `DataStream`, you can create and work with a relational `Table` abstraction.
Tables have a schema and allow running relational operations on them, including selection, aggregation, and joins.
A `Table` can be created from a `DataSet` or a `DataStream` and then queried either using the Table API operators or using SQL queries.
Once a `Table` is converted back to a `DataSet` or `DataStream`, the defined relational plan is optimized using [Apache Calcite](https://calcite.apache.org/)
and transformed into a `DataSet` or `DataStream` execution plan.

* This will be replaced by the TOC
{:toc}

Using the Table API and SQL
----------------------------

The following dependency must be added to your project in order to use the Table API:
The Table API and SQL are part of the *flink-libraries* Maven project.
The following dependency must be added to your project in order to use the Table API and SQL:

{% highlight xml %}
<dependency>
Expand All @@ -48,7 +59,13 @@ The following dependency must be added to your project in order to use the Table

Note that the Table API is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).

## Scala Table API
Table API
----------
The Table API provides methods for running relational operations on Tables, both in Scala and Java.
In the following sections you can find examples that show how to create Tables, how to define and execute relational queries on them,
and how to retrieve the result of a query as a `DataSet`.

### Scala Table API

The Table API can be enabled by importing `org.apache.flink.api.scala.table._`. This enables
implicit conversions that allow
Expand Down Expand Up @@ -90,7 +107,9 @@ in this example we see that you can also use Strings to specify relational expre
Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a
description of the expression syntax.

## Java Table API
{% top %}

### Java Table API

When using Java, Tables can be converted to and from DataSet using `TableEnvironment`.
This example is equivalent to the above Scala Example:
Expand Down Expand Up @@ -131,7 +150,9 @@ DataSet<WC> result = tableEnv.toDataSet(wordCounts, WC.class);
When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
are supported. They support exactly the same feature set as the expression DSL.

## Table API Operators
{% top %}

### Table API Operators
The Table API provides a domain-spcific language to execute language-integrated queries on structured data in Scala and Java.
This section gives a brief overview of all available operators. You can find more details of operators in the [Javadoc]({{site.baseurl}}/api/java/org/apache/flink/api/table/Table.html).

Expand Down Expand Up @@ -367,7 +388,9 @@ val result = in.distinct();
</div>
</div>

## Expression Syntax
{% top %}

### Expression Syntax
Some of operators in previous section expect an expression. These can either be specified using an embedded Scala DSL or
a String expression. Please refer to the examples above to learn how expressions can be
formulated.
Expand Down Expand Up @@ -408,3 +431,132 @@ Here, `literal` is a valid Java literal and `field reference` specifies a column
column names follow Java identifier syntax.

Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A `LONG` casted to `DATE` must be a milliseconds timestamp. A `STRING` casted to `DATE` must have the format "`yyyy-MM-dd HH:mm:ss.SSS`", "`yyyy-MM-dd`", "`HH:mm:ss`", or a milliseconds timestamp. By default, all timestamps refer to the UTC timezone beginning from January 1, 1970, 00:00:00 in milliseconds.

{% top %}

SQL
----
The Table API also supports embedded SQL queries.
In order to use a `Table` or `DataSet` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
A registered `Table` can be retrieved back from the `TableEnvironment` using the `scan` method:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create a Table environment
TableEnvironment tableEnv = new TableEnvironment();
// reset the translation context: this will erase existing registered Tables
TranslationContext.reset();
// read a DataSet from an external source
DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
// register the DataSet under the name "MyTable"
tableEnv.registerDataSet("MyTable", ds);
// retrieve "MyTable" into a new Table
Table t = tableEnv.scan("MyTable");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment
// create a Table environment
val tableEnv = new TableEnvironment
// reset the translation context: this will erase existing registered Tables
TranslationContext.reset()
// read a DataSet from an external source
val ds = env.readCsvFile(...)
// register the DataSet under the name "MyTable"
tableEnv.registerDataSet("MyTable", ds)
// retrieve "MyTable" into a new Table
val t = tableEnv.scan("MyTable")
{% endhighlight %}
</div>
</div>

*Note: Table names are not allowed to follow the `^_DataSetTable_[0-9]+` pattern, as this is reserved for internal use only.*

When registering a `DataSet`, one can also give names to the `Table` columns. For example, if "MyTable" has three columns, `user`, `product`, and `order`, we can give them names upon registering the `DataSet` as shown below:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// register the DataSet under the name "MyTable" with columns user, product, and order
tableEnv.registerDataSet("MyTable", ds, "user, product, order");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
// register the DataSet under the name "MyTable" with columns user, product, and order
tableEnv.registerDataSet("MyTable", ds, 'user, 'product, 'order)
{% endhighlight %}
</div>
</div>

A `Table` can be registered in a similar way:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// read a DataSet from an external source
DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
// create a Table from the DataSet with columns user, product, and order
Table t = tableEnv.fromDataSet(ds).as("user, product, order");
// register the Table under the name "MyTable"
tableEnv.registerTable("MyTable", t);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
// read a DataSet from an external source and
// create a Table from the DataSet with columns user, product, and order
val t = env.readCsvFile(...).as('user, 'product, 'order)
// register the Table under the name "MyTable"
tableEnv.registerTable("MyTable", t)
{% endhighlight %}
</div>
</div>

After registering a `Table` or `DataSet`, one can use them in SQL queries. A SQL query is defined using the `sql` method of the `TableEnvironment`.
The result of the method is a new `Table` which can either be converted back to a `DataSet` or used in subsequent Table API queries.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create a Table environment
TableEnvironment tableEnv = new TableEnvironment();
// reset the translation context: this will erase existing registered Tables
TranslationContext.reset();
// read a DataSet from an external source
DataSet<Tuple2<Integer, Long>> ds = env.readCsvFile(...);
// create a Table from the DataSet
Table t = tableEnv.fromDataSet(ds);
// register the Table under the name "MyTable"
tableEnv.registerTable("MyTable", t);
// run a sql query and retrieve the result in a new Table
Table result = tableEnv.sql("SELECT * FROM MyTable");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment
// create a Table environment
val tableEnv = new TableEnvironment
// reset the translation context: this will erase existing registered Tables
TranslationContext.reset()
// create a Table
val t = env.readCsvFile(...).as('a, 'b, 'c)
// register the Table under the name "MyTable"
tableEnv.registerTable("MyTable", t)
// run a sql query and retrieve the result in a new Table
val result = tableEnv.sql("SELECT * FROM MyTable")
{% endhighlight %}
</div>
</div>

{% top %}

0 comments on commit ef7f9ac

Please sign in to comment.