Skip to content

Latest commit

 

History

History
1040 lines (901 loc) · 44.5 KB

sql.md

File metadata and controls

1040 lines (901 loc) · 44.5 KB
title nav-parent_id nav-pos
SQL
tableapi
30

SQL queries are specified with the sqlQuery() method of the TableEnvironment. The method returns the result of the SQL query as a Table. A Table can be used in subsequent SQL and Table API queries, be converted into a DataSet or DataStream, or written to a TableSink). SQL and Table API queries can be seamlessly mixed and are holistically optimized and translated into a single program.

In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a TableSource, Table, DataStream, or DataSet. Alternatively, users can also register external catalogs in a TableEnvironment to specify the location of the data sources.

For convenience Table.toString() automatically registers the table under a unique name in its TableEnvironment and returns the name. Hence, Table objects can be directly inlined into SQL queries (by string concatenation) as shown in the examples below.

Note: Flink's SQL support is not yet feature complete. Queries that include unsupported SQL features cause a TableException. The supported features of SQL on batch and streaming tables are listed in the following sections.

  • This will be replaced by the TOC {:toc}

Specifying a Query

The following examples show how to specify a SQL queries on registered and inlined tables.

{% highlight java %} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// ingest a DataStream from an external source DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

// SQL query with an inlined (unregistered) table Table table = tableEnv.fromDataStream(ds, "user, product, amount"); Table result = tableEnv.sqlQuery( "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");

// SQL query with a registered table // register the DataStream as table "Orders" tableEnv.registerDataStream("Orders", ds, "user, product, amount"); // run a SQL query on the Table and retrieve the result as a new Table Table result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// SQL update with a registered table // create and register a TableSink TableSink csvSink = new CsvTableSink("/path/to/file", ...); String[] fieldNames = {"product", "amount"}; TypeInformation[] fieldTypes = {Types.STRING, Types.INT}; tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink); // run a SQL update query on the Table and emit the result to the TableSink tableEnv.sqlUpdate( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); {% endhighlight %}

{% highlight scala %} val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env)

// read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...)

// SQL query with an inlined (unregistered) table val table = ds.toTable(tableEnv, 'user, 'product, 'amount) val result = tableEnv.sqlQuery( s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

// SQL query with a registered table // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

// SQL update with a registered table // create and register a TableSink val csvSink: CsvTableSink = new CsvTableSink("/path/to/file", ...) val fieldNames: Array[String] = Array("product", "amount") val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT) tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink) // run a SQL update query on the Table and emit the result to the TableSink tableEnv.sqlUpdate( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") {% endhighlight %}

{% top %}

Supported Syntax

Flink parses SQL using Apache Calcite, which supports standard ANSI SQL. DDL statements are not supported by Flink.

The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.

{% highlight sql %}

insert: INSERT INTO tableReference query

query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem: expression [ ASC | DESC ]

select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* }

projectItem: expression [ [ AS ] columnAlias ] | tableAlias . *

tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition: ON booleanExpression | USING '(' column [, column ]* ')'

tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')'

values: VALUES expression [, expression ]*

groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef: windowName | windowSpec

windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')'

matchRecognize: MATCH_RECOGNIZE '(' [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN '(' pattern ')' [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ')'

measureColumn: expression AS alias

pattern: patternTerm [ '|' patternTerm ]*

patternTerm: patternFactor [ patternFactor ]*

patternFactor: variable [ patternQuantifier ]

patternQuantifier: '' | '?' | '+' | '+?' | '?' | '??' | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?'] | '{' repeat '}'

{% endhighlight %}

Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:

  • The case of identifiers is preserved whether or not they are quoted.
  • After which, identifiers are matched case-sensitively.
  • Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g. "SELECT a AS my field FROM t").

String literals must be enclosed in single quotes (e.g., SELECT 'Hello World'). Duplicate a single quote for escaping (e.g., SELECT 'It''s me.'). Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:

  • Use the backslash (\) as escaping character (default): SELECT U&'\263A'
  • Use a custom escaping character: SELECT U&'#263A' UESCAPE '#'

{% top %}

Operations

Scan, Projection, and Filter

Operation Description
Scan / Select / As
Batch Streaming
{% highlight sql %} SELECT * FROM Orders

SELECT a, c AS d FROM Orders {% endhighlight %}

Where / Filter
Batch Streaming
{% highlight sql %} SELECT * FROM Orders WHERE b = 'red'

SELECT * FROM Orders WHERE a % 2 = 0 {% endhighlight %}

User-defined Scalar Functions (Scalar UDF)
Batch Streaming

UDFs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register scalar UDFs.

{% highlight sql %} SELECT PRETTY_PRINT(user) FROM Orders {% endhighlight %}

{% top %}

Aggregations

Operation Description
GroupBy Aggregation
Batch Streaming
Result Updating

Note: GroupBy on a streaming table produces an updating result. See the Dynamic Tables Streaming Concepts page for details.

{% highlight sql %} SELECT a, SUM(b) as d FROM Orders GROUP BY a {% endhighlight %}
GroupBy Window Aggregation
Batch Streaming

Use a group window to compute a single result row per group. See Group Windows section for more details.

{% highlight sql %} SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user {% endhighlight %}
Over Window aggregation
Streaming

Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute

{% highlight sql %} SELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders

SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders WINDOW w AS ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
{% endhighlight %}

Distinct
Batch Streaming
Result Updating
{% highlight sql %} SELECT DISTINCT users FROM Orders {% endhighlight %}

Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.

Grouping sets, Rollup, Cube
Batch
{% highlight sql %} SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product)) {% endhighlight %}
Having
Batch Streaming
{% highlight sql %} SELECT SUM(amount) FROM Orders GROUP BY users HAVING SUM(amount) > 50 {% endhighlight %}
User-defined Aggregate Functions (UDAGG)
Batch Streaming

UDAGGs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register UDAGGs.

{% highlight sql %} SELECT MyAggregate(amount) FROM Orders GROUP BY users {% endhighlight %}

{% top %}

Joins

Operation Description
Inner Equi-join
Batch Streaming

Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.

Note: The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.

{% highlight sql %} SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id {% endhighlight %}

Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.

Outer Equi-join
Batch Streaming Result Updating

Currently, only equi-joins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported.

Note: The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail.

{% highlight sql %} SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id

SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id

SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id {% endhighlight %}

Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.

Time-windowed Join
Batch Streaming

Note: Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.

    <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming/time_attributes.html">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
    <p>For example, the following predicates are valid window join conditions:</p>

    <ul>
      <li><code>ltime = rtime</code></li>
      <li><code>ltime &gt;= rtime AND ltime &lt; rtime + INTERVAL '10' MINUTE</code></li>
      <li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li>
    </ul>

{% highlight sql %} SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime {% endhighlight %}

The example above will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.

Expanding arrays into a relation
Batch Streaming

Unnesting WITH ORDINALITY is not supported yet.

{% highlight sql %} SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) {% endhighlight %}
Join with Table Function
Batch Streaming

Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function.

User-defined table functions (UDTFs) must be registered before. See the UDF documentation for details on how to specify and register UDTFs.

    <p><b>Inner Join</b></p>
    <p>A row of the left (outer) table is dropped, if its table function call returns an empty result.</p>

{% highlight sql %} SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag {% endhighlight %}

    <p><b>Left Outer Join</b></p>
    <p>If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.</p>

{% highlight sql %} SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE {% endhighlight %}

    <p><b>Note:</b> Currently, only literal <code>TRUE</code> is supported as predicate for a left outer join against a lateral table.</p>
  </td>
</tr>
<tr>
  <td>
    <strong>Join with Temporal Table</strong><br>
    <span class="label label-primary">Streaming</span>
  </td>
  <td>
    <p><a href="streaming/temporal_tables.html">Temporal tables</a> are tables that track changes over time.</p>
    <p>A <a href="streaming/temporal_tables.html#temporal-table-functions">Temporal table function</a> provides access to the state of a temporal table at a specific point in time.
    The syntax to join a table with a temporal table function is the same as in <i>Join with Table Function</i>.</p>

    <p><b>Note:</b> Currently only inner joins with temporal tables are supported.</p>

    <p>Assuming <i>Rates</i> is a <a href="streaming/temporal_tables.html#temporal-table-functions">temporal table function</a>, the join can be expressed in SQL as follows:</p>

{% highlight sql %} SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency {% endhighlight %}

For more information please check the more detailed temporal tables concept description.

{% top %}

Set Operations

<tr>
  <td>
    <strong>Intersect / Except</strong><br>
    <span class="label label-primary">Batch</span>
  </td>
  <td>

{% highlight sql %} SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) INTERSECT (SELECT user FROM Orders WHERE b = 0) ) {% endhighlight %} {% highlight sql %} SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) EXCEPT (SELECT user FROM Orders WHERE b = 0) ) {% endhighlight %}

<tr>
  <td>
    <strong>In</strong><br>
    <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
  </td>
  <td>
    <p>Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>

{% highlight sql %} SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts ) {% endhighlight %}

Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.

<tr>
  <td>
    <strong>Exists</strong><br>
    <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
  </td>
  <td>
    <p>Returns true if the sub-query returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>

{% highlight sql %} SELECT user, amount FROM Orders WHERE product EXISTS ( SELECT product FROM NewProducts ) {% endhighlight %}

Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.

Operation Description
Union
Batch
{% highlight sql %} SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION (SELECT user FROM Orders WHERE b = 0) ) {% endhighlight %}
UnionAll
Batch Streaming
{% highlight sql %} SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION ALL (SELECT user FROM Orders WHERE b = 0) ) {% endhighlight %}

{% top %}

OrderBy & Limit

<tr>
  <td><strong>Limit</strong><br>
    <span class="label label-primary">Batch</span>
  </td>
  <td>

Note: The LIMIT clause requires an ORDER BY clause. {% highlight sql %} SELECT * FROM Orders ORDER BY orderTime LIMIT 3 {% endhighlight %}

Operation Description
Order By
Batch Streaming
Note: The result of streaming queries must be primarily sorted on an ascending time attribute. Additional sorting attributes are supported.

{% highlight sql %} SELECT * FROM Orders ORDER BY orderTime {% endhighlight %}

{% top %}

Insert

Operation Description
Insert Into
Batch Streaming

Output tables must be registered in the TableEnvironment (see Register a TableSink). Moreover, the schema of the registered table must match the schema of the query.

{% highlight sql %} INSERT INTO OutputTable SELECT users, tag FROM Orders {% endhighlight %}

{% top %}

Group Windows

Group windows are defined in the GROUP BY clause of a SQL query. Just like queries with regular GROUP BY clauses, queries with a GROUP BY clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.

Group Window Function Description
TUMBLE(time_attr, interval) Defines a tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (interval). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).
HOP(time_attr, interval, interval) Defines a hopping time window (called sliding window in the Table API). A hopping time window has a fixed duration (second interval parameter) and hops by a specified hop interval (first interval parameter). If the hop interval is smaller than the window size, hopping windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Hopping windows can be defined on event-time (stream + batch) or processing-time (stream).
SESSION(time_attr, interval) Defines a session time window. Session time windows do not have a fixed duration but their bounds are defined by a time interval of inactivity, i.e., a session window is closed if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on event-time (stream + batch) or processing-time (stream).

Time Attributes

For SQL queries on streaming tables, the time_attr argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the documentation of time attributes to learn how to define time attributes.

For SQL on batch tables, the time_attr argument of the group window function must be an attribute of type TIMESTAMP.

Selecting Group Window Start and End Timestamps

The start and end timestamps of group windows as well as time attributes can be selected with the following auxiliary functions:

Auxiliary Function Description
TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)

Returns the timestamp of the inclusive lower bound of the corresponding tumbling, hopping, or session window.

TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)

Returns the timestamp of the exclusive upper bound of the corresponding tumbling, hopping, or session window.

Note: The exclusive upper bound timestamp cannot be used as a rowtime attribute in subsequent time-based operations, such as time-windowed joins and group window or over window aggregations.

TUMBLE_ROWTIME(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)

Returns the timestamp of the inclusive upper bound of the corresponding tumbling, hopping, or session window.

The resulting attribute is a rowtime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations.

TUMBLE_PROCTIME(time_attr, interval)
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)

Returns a proctime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations.

Note: Auxiliary functions must be called with exactly same arguments as the group window function in the GROUP BY clause.

The following examples show how to specify SQL queries with group windows on streaming tables.

{% highlight java %} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// ingest a DataStream from an external source DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...); // register the DataStream as table "Orders" tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");

// compute SUM(amount) per day (in event-time) Table result1 = tableEnv.sqlQuery( "SELECT user, " + " TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, " + " SUM(amount) FROM Orders " + "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

// compute SUM(amount) per day (in processing-time) Table result2 = tableEnv.sqlQuery( "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");

// compute every hour the SUM(amount) of the last 24 hours in event-time Table result3 = tableEnv.sqlQuery( "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");

// compute SUM(amount) per session with 12 hour inactivity gap (in event-time) Table result4 = tableEnv.sqlQuery( "SELECT user, " + " SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " + " SESSION_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd, " + " SUM(amount) " + "FROM Orders " + "GROUP BY SESSION(rowtime, INTERVAL '12' HOUR), user");

{% endhighlight %}

{% highlight scala %} val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env)

// read a DataStream from an external source val ds: DataStream[(Long, String, Int)] = env.addSource(...) // register the DataStream under the name "Orders" tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)

// compute SUM(amount) per day (in event-time) val result1 = tableEnv.sqlQuery( """ |SELECT | user, | TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart, | SUM(amount) | FROM Orders | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user """.stripMargin)

// compute SUM(amount) per day (in processing-time) val result2 = tableEnv.sqlQuery( "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")

// compute every hour the SUM(amount) of the last 24 hours in event-time val result3 = tableEnv.sqlQuery( "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")

// compute SUM(amount) per session with 12 hour inactivity gap (in event-time) val result4 = tableEnv.sqlQuery( """ |SELECT | user, | SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, | SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd, | SUM(amount) | FROM Orders | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user """.stripMargin)

{% endhighlight %}

{% top %}

Pattern Recognition

Operation Description
MATCH_RECOGNIZE
Streaming

Searches for a given pattern in a streaming table according to the MATCH_RECOGNIZE ISO standard. This makes it possible to express complex event processing (CEP) logic in SQL queries.

For a more detailed description, see the dedicated page for detecting patterns in tables.

{% highlight sql %} SELECT T.aid, T.bid, T.cid FROM MyTable MATCH_RECOGNIZE ( PARTITION BY userid ORDER BY proctime MEASURES A.id AS aid, B.id AS bid, C.id AS cid PATTERN (A B C) DEFINE A AS name = 'a', B AS name = 'b', C AS name = 'c' ) AS T {% endhighlight %}

{% top %}

Data Types

The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internally, it also uses Flink's TypeInformation to define data types. Fully supported types are listed in org.apache.flink.table.api.Types. The following table summarizes the relation between SQL Types, Table API types, and the resulting Java class.

Table API SQL Java type
Types.STRING VARCHAR java.lang.String
Types.BOOLEAN BOOLEAN java.lang.Boolean
Types.BYTE TINYINT java.lang.Byte
Types.SHORT SMALLINT java.lang.Short
Types.INT INTEGER, INT java.lang.Integer
Types.LONG BIGINT java.lang.Long
Types.FLOAT REAL, FLOAT java.lang.Float
Types.DOUBLE DOUBLE java.lang.Double
Types.DECIMAL DECIMAL java.math.BigDecimal
Types.SQL_DATE DATE java.sql.Date
Types.SQL_TIME TIME java.sql.Time
Types.SQL_TIMESTAMP TIMESTAMP(3) java.sql.Timestamp
Types.INTERVAL_MONTHS INTERVAL YEAR TO MONTH java.lang.Integer
Types.INTERVAL_MILLIS INTERVAL DAY TO SECOND(3) java.lang.Long
Types.PRIMITIVE_ARRAY ARRAY e.g. int[]
Types.OBJECT_ARRAY ARRAY e.g. java.lang.Byte[]
Types.MAP MAP java.util.HashMap
Types.MULTISET MULTISET e.g. java.util.HashMap<String, Integer> for a multiset of String
Types.ROW ROW org.apache.flink.types.Row

Generic types and (nested) composite types (e.g., POJOs, tuples, rows, Scala case classes) can be fields of a row as well.

Fields of composite types with arbitrary nesting can be accessed with value access functions.

Generic types are treated as a black box and can be passed on or processed by user-defined functions.

{% top %}

Reserved Keywords

Although not every SQL feature is implemented yet, some string combinations are already reserved as keywords for future use. If you want to use one of the following strings as a field name, make sure to surround them with backticks (e.g. `value`, `count`).

{% highlight sql %}

A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE

{% endhighlight %}

{% top %}