Skip to content

Commit

Permalink
[FLINK-2955] [Documentation] Add operators description in Table API p…
Browse files Browse the repository at this point in the history
…age.
  • Loading branch information
chengxiang li authored and aljoscha committed Nov 19, 2015
1 parent 3cf78aa commit 8325bc6
Showing 1 changed file with 206 additions and 12 deletions.
218 changes: 206 additions & 12 deletions docs/libs/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ under the License.

**The Table API an experimental feature**

Flink provides an API that allows specifying operations using SQL-like expressions. Instead of
Flink provides an API that allows specifying operations using SQL-like expressions. Instead of
manipulating `DataSet` or `DataStream` you work with `Table` on which relational operations can
be performed.
be performed.

The following dependency must be added to your project when using the Table API:

Expand All @@ -40,7 +40,7 @@ The following dependency must be added to your project when using the Table API:
Note that the Table API is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).

## Scala Table API

The Table API can be enabled by importing `org.apache.flink.api.scala.table._`. This enables
implicit conversions that allow
converting a DataSet or DataStream to a Table. This example shows how a DataSet can
Expand All @@ -49,7 +49,7 @@ converted back to a DataSet:

{% highlight scala %}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.table._

case class WC(word: String, count: Int)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
Expand All @@ -60,9 +60,9 @@ val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[W
The expression DSL uses Scala symbols to refer to field names and we use code generation to
transform expressions to efficient runtime code. Please note that the conversion to and from
Tables only works when using Scala case classes or Flink POJOs. Please check out
the [programming guide]({{ site.baseurl }}/apis/programming_guide.html) to learn the requirements for a class to be
the [programming guide]({{ site.baseurl }}/apis/programming_guide.html) to learn the requirements for a class to be
considered a POJO.

This is another example that shows how you
can join to Tables:

Expand All @@ -79,7 +79,7 @@ names for the fields. This can also be used to disambiguate fields before a join
in this example we see that you can also use Strings to specify relational expressions.

Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a
description of the expression syntax.
description of the expression syntax.

## Java Table API

Expand Down Expand Up @@ -123,13 +123,207 @@ DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
are supported. They support exactly the same feature set as the expression DSL.

## Expression Syntax
## Table API Operators
Table API provide a domain-spcific language to execute language-integrated queries on structured data in Scala and Java.
This section gives a brief overview of all available operators. You can find more details of operators in the [Javadoc]({{site.baseurl}}/api/java/org/apache/flink/api/table/Table.html).

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

<br />

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Operators</th>
<th class="text-center">Description</th>
</tr>
</thead>

<tbody>
<tr>
<td><strong>Select</strong></td>
<td>
<p>Similar to a SQL SELECT statement. Perform a select operation.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.select("a, c as d");
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>As</strong></td>
<td>
<p>Rename fields.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.as("d, e, f");
{% endhighlight %}
</td>
</tr>

A `Table` supports to following operations: `select`, `where`, `groupBy`, `join` (Plus `filter` as
an alias for `where`.). These are also documented in the [Javadoc](https://flink.apache.org/docs/latest/api/java/org/apache/flink/api/table/Table.html)
of Table.
<tr>
<td><strong>Filter</strong></td>
<td>
<p>Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.filter("a % 2 = 0");
{% endhighlight %}
</td>
</tr>

Some of these expect an expression. These can either be specified using an embedded Scala DSL or
<tr>
<td><strong>Where</strong></td>
<td>
<p>Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.where("b = 'red'");
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>GroupBy</strong></td>
<td>
<p>Similar to a SQL GROUPBY clause. Group the elements on the grouping keys, with a following aggregation</p>
<p>operator to aggregate on per-group basis.</p>
{% highlight java %}
Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.groupBy("a").select("a, b.sum as d");
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Join</strong></td>
<td>
<p>Similar to a SQL JOIN clause. Join two tables, both tables must have distinct field name, and the where</p>
<p>clause is mandatory for join condition.</p>
{% highlight java %}
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.join(right).where("a = d").select("a, b, e");
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Union</strong></td>
<td>
<p>Similar to a SQL UNION ALL clause. Union two tables, both tables must have identical schema(field names and types).</p>
{% highlight java %}
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);
{% endhighlight %}
</td>
</tr>

</tbody>
</table>

</div>
<div data-lang="scala" markdown="1">
<br />

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Operators</th>
<th class="text-center">Description</th>
</tr>
</thead>

<tbody>
<tr>
<td><strong>Select</strong></td>
<td>
<p>Similar to a SQL SELECT statement. Perform a select operation.</p>
{% highlight scala %}
val in = ds.as('a, 'b, 'c);
val result = in.select('a, 'c as 'd);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>As</strong></td>
<td>
<p>Rename fields.</p>
{% highlight scala %}
val in = ds.as('a, 'b, 'c);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Filter</strong></td>
<td>
<p>Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.</p>
{% highlight scala %}
val in = ds.as('a, 'b, 'c);
val result = in.filter('a % 2 === 0)
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Where</strong></td>
<td>
<p>Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.</p>
{% highlight scala %}
val in = ds.as('a, 'b, 'c);
val result = in.where('b === "red");
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>GroupBy</strong></td>
<td>
<p>Similar to a SQL GROUPBY clause. Group the elements on the grouping keys, with a following aggregation</p>
<p>operator to aggregate on per-group basis.</p>
{% highlight scala %}
val in = ds.as('a, 'b, 'c);
val result = in.groupBy('a).select('a, 'b.sum as 'd);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>Join</strong></td>
<td>
<p>Similar to a SQL JOIN clause. Join two tables, both tables must have distinct field name, and the where</p>
<p>clause is mandatory for join condition.</p>
{% highlight scala %}
val left = ds1.as('a, 'b, 'c);
val right = ds2.as('d, 'e, 'f);
val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
{% endhighlight %}