From 9ee5d6de12ef589dac4ba53ec9ee7cb4de38781c Mon Sep 17 00:00:00 2001 From: Roc Marshal Date: Sat, 24 Oct 2020 22:16:04 +0800 Subject: [PATCH] [FLINK-19749][docs] Improve the documentation in 'Table API' page, e.g. typo, sync between the English and Chinese doc, etc This closes #13791. --- docs/dev/table/tableApi.md | 24 +++--- docs/dev/table/tableApi.zh.md | 156 +++++++++++++++++----------------- 2 files changed, 92 insertions(+), 88 deletions(-) diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index ae54768250aad..3b63e83061556 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -24,7 +24,7 @@ under the License. The Table API is a unified, relational API for stream and batch processing. Table API queries can be run on batch or streaming input without modifications. The Table API is a super set of the SQL language and is specially designed for working with Apache Flink. The Table API is a language-integrated API for Scala, Java and Python. Instead of specifying queries as String values as common with SQL, Table API queries are defined in a language-embedded style in Java, Scala or Python with IDE support like autocompletion and syntax validation. -The Table API shares many concepts and parts of its API with Flink's SQL integration. Have a look at the [Common Concepts & API]({{ site.baseurl }}/dev/table/common.html) to learn how to register tables or to create a `Table` object. The [Streaming Concepts](./streaming) pages discuss streaming specific concepts such as dynamic tables and time attributes. +The Table API shares many concepts and parts of its API with Flink's SQL integration. Have a look at the [Common Concepts & API]({% link dev/table/common.md %}) to learn how to register tables or to create a `Table` object. The [Streaming Concepts](./streaming) pages discuss streaming specific concepts such as dynamic tables and time attributes. The following examples assume a registered table called `Orders` with attributes `(a, b, c, rowtime)`. The `rowtime` field is either a logical [time attribute](./streaming/time_attributes.html) in streaming or a regular timestamp field in batch. @@ -621,7 +621,7 @@ Table result = orders.addColumns(concat($("c"), "sunny")); Batch Streaming -

Performs a field add operation. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

+

Performs a field add operation. Existing fields will be replaced if the added column name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

{% highlight java %} Table orders = tableEnv.from("Orders"); Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc")); @@ -687,7 +687,7 @@ val result = orders.addColumns(concat($"c", "Sunny")) Batch Streaming -

Performs a field add operation. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

+

Performs a field add operation. Existing fields will be replaced if the added column name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

{% highlight scala %} val orders = tableEnv.from("Orders"); val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc") @@ -755,7 +755,7 @@ result = orders.add_columns(concat(orders.c, 'sunny')) Batch Streaming -

Performs a field add operation. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

+

Performs a field add operation. Existing fields will be replaced if the added column name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

{% highlight python %} from pyflink.table.expressions import concat @@ -836,7 +836,7 @@ Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d")); {% highlight java %} Table orders = tableEnv.from("Orders"); Table result = orders - .window(Tumble.over(lit(5).minutes())).on($("rowtime")).as("w")) // define window + .window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w")) // define window .groupBy($("a"), $("w")) // group by key and window // access window properties and aggregate .select( @@ -895,7 +895,7 @@ Table groupByDistinctResult = orders // Distinct aggregation on time window group by Table groupByWindowDistinctResult = orders .window(Tumble - .over(lit(5).minutes())) + .over(lit(5).minutes()) .on($("rowtime")) .as("w") ) @@ -1264,7 +1264,7 @@ Table result = left.join(right) .where( and( $("a").isEqual($("d")), - $("ltime").isGreaterEqual($("rtime").minus(lit(5).minutes())), + $("ltime").isGreaterOrEqual($("rtime").minus(lit(5).minutes())), $("ltime").isLess($("rtime").plus(lit(10).minutes())) )) .select($("a"), $("b"), $("e"), $("ltime")); @@ -2740,7 +2740,7 @@ A session window is defined by using the `Session` class as follows: ### Over Windows -Over window aggregates are known from standard SQL (`OVER` clause) and defined in the `SELECT` clause of a query. Unlike group windows, which are specified in the `GROUP BY` clause, over windows do not collapse rows. Instead over window aggregates compute an aggregate for each input row over a range of its neighboring rows. +Over window aggregates are known from standard SQL (`OVER` clause) and defined in the `SELECT` clause of a query. Unlike group windows, which are specified in the `GROUP BY` clause, over windows do not collapse rows. Instead over window aggregates compute an aggregate for each input row over a range of its neighboring rows. Over windows are defined using the `window(w: OverWindow*)` clause (using `over_window(*OverWindow)` in Python API) and referenced via an alias in the `select()` method. The following example shows how to define an over window aggregation on a table. @@ -3177,7 +3177,7 @@ Table table = input

Similar to a GroupBy Aggregation. Groups the rows on the grouping keys with the following running table aggregation operator to aggregate rows group-wise. The difference from an AggregateFunction is that TableAggregateFunction may return 0 or more records for a group. You have to close the "flatAggregate" with a select statement. And the select statement does not support aggregate functions.

-

Instead of using emitValue to output results, you can also use the emitUpdateWithRetract method. Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated. This method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract method will be used in preference to the emitValue method if both methods are defined in the table aggregate function, because the method is treated to be more efficient than emitValue as it can output values incrementally. See Table Aggregation Functions for details.

+

Instead of using emitValue to output results, you can also use the emitUpdateWithRetract method. Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated. This method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract method will be used in preference to the emitValue method if both methods are defined in the table aggregate function, because the method is treated to be more efficient than emitValue as it can output values incrementally. See Table Aggregation Functions for details.

{% highlight java %} /** * Accumulator for Top2. @@ -3403,7 +3403,7 @@ val table = input

Similar to a GroupBy Aggregation. Groups the rows on the grouping keys with the following running table aggregation operator to aggregate rows group-wise. The difference from an AggregateFunction is that TableAggregateFunction may return 0 or more records for a group. You have to close the "flatAggregate" with a select statement. And the select statement does not support aggregate functions.

-

Instead of using emitValue to output results, you can also use the emitUpdateWithRetract method. Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated. This method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract method will be used in preference to the emitValue method if both methods are defined in the table aggregate function, because the method is treated to be more efficient than emitValue as it can output values incrementally. See Table Aggregation Functions for details.

+

Instead of using emitValue to output results, you can also use the emitUpdateWithRetract method. Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated. This method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract method will be used in preference to the emitValue method if both methods are defined in the table aggregate function, because the method is treated to be more efficient than emitValue as it can output values incrementally. See Table Aggregation Functions for details.

{% highlight scala %} import java.lang.{Integer => JInteger} import org.apache.flink.table.api.Types @@ -3576,9 +3576,9 @@ Please see the dedicated page about [data types](types.html). Generic types and (nested) composite types (e.g., POJOs, tuples, rows, Scala case classes) can be fields of a row as well. -Fields of composite types with arbitrary nesting can be accessed with [value access functions]({{ site.baseurl }}/dev/table/functions/systemFunctions.html#value-access-functions). +Fields of composite types with arbitrary nesting can be accessed with [value access functions]({% link dev/table/functions/systemFunctions.md %}#value-access-functions). -Generic types are treated as a black box and can be passed on or processed by [user-defined functions]({{ site.baseurl }}/dev/table/functions/udfs.html). +Generic types are treated as a black box and can be passed on or processed by [user-defined functions]({% link dev/table/functions/udfs.md %}). {% top %} diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md index 579cb2736b1b3..c713569fbced8 100644 --- a/docs/dev/table/tableApi.zh.md +++ b/docs/dev/table/tableApi.zh.md @@ -24,7 +24,7 @@ under the License. The Table API is a unified, relational API for stream and batch processing. Table API queries can be run on batch or streaming input without modifications. The Table API is a super set of the SQL language and is specially designed for working with Apache Flink. The Table API is a language-integrated API for Scala, Java and Python. Instead of specifying queries as String values as common with SQL, Table API queries are defined in a language-embedded style in Java, Scala or Python with IDE support like autocompletion and syntax validation. -The Table API shares many concepts and parts of its API with Flink's SQL integration. Have a look at the [Common Concepts & API]({{ site.baseurl }}/dev/table/common.html) to learn how to register tables or to create a `Table` object. The [Streaming Concepts](./streaming) pages discuss streaming specific concepts such as dynamic tables and time attributes. +The Table API shares many concepts and parts of its API with Flink's SQL integration. Have a look at the [Common Concepts & API]({% link dev/table/common.zh.md %}) to learn how to register tables or to create a `Table` object. The [Streaming Concepts](./streaming) pages discuss streaming specific concepts such as dynamic tables and time attributes. The following examples assume a registered table called `Orders` with attributes `(a, b, c, rowtime)`. The `rowtime` field is either a logical [time attribute](./streaming/time_attributes.html) in streaming or a regular timestamp field in batch. @@ -101,9 +101,7 @@ val result = orders
-使用`from pyflink.table import *`来导入Python Table API。 - -下面这个例子演示了如何组织一个Python Table API程序,以及字符串形式的表达式用法。 +下面这个例子演示了如何组织一个 Python Table API 程序,以及字符串形式的表达式用法。 {% highlight python %} from pyflink.table import * @@ -204,6 +202,8 @@ val result: Table = orders {% highlight python %} # specify table program +from pyflink.table.expressions import col, lit + orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) result = orders.filter(orders.a.is_not_null & orders.b.is_not_null & orders.c.is_not_null) \ @@ -250,7 +250,7 @@ Table orders = tableEnv.from("Orders"); {% endhighlight %} - + Values
Batch Streaming @@ -477,11 +477,11 @@ val result = orders.where($"b" === "red") - From
+ FromPath
批处理 流处理 -

类似于SQL请求中的FROM子句,将一个环境中已注册的表转换成Table对象。

+

类似于 SQL 请求中的 FROM 子句,将一个环境中已注册的表转换成 Table 对象。

{% highlight python %} orders = t_env.from_path("Orders") {% endhighlight %} @@ -490,7 +490,7 @@ orders = t_env.from_path("Orders") FromElements
- Batch Streaming + 批处理 流处理

Similar to the VALUES clause in a SQL query. Produces an inline table out of the provided rows.

@@ -529,19 +529,19 @@ root |-- name: STRING {% endhighlight %} - + Select
批处理 流处理 -

类似于SQL请求中的SELECT子句,执行一个select操作。

+

类似于 SQL 请求中的 SELECT 子句,执行一个 select 操作。

{% highlight python %} orders = t_env.from_path("Orders") result = orders.select(orders.a, orders.c.alias('d')) {% endhighlight %} -

您可以使用星号 (*) 表示选择表中的所有列。

+

你可以使用星号(*)表示选择表中的所有列。

{% highlight python %} from pyflink.table.expressions import col @@ -569,7 +569,7 @@ result = orders.alias("x, y, z, t") 批处理 流处理 -

类似于SQL请求中的WHERE子句,过滤掉表中不满足条件的行。

+

类似于 SQL 请求中的 WHERE 子句,过滤掉表中不满足条件的行。

{% highlight python %} orders = t_env.from_path("Orders") result = orders.where(orders.a == 'red') @@ -614,14 +614,14 @@ Table result = orders.addColumns(concat($("c"), "sunny")); {% endhighlight %} - + AddOrReplaceColumns
Batch Streaming -

Performs a field add operation. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

+

Performs a field add operation. Existing fields will be replaced if the added column name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

{% highlight java %} Table orders = tableEnv.from("Orders"); Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc")); @@ -687,7 +687,7 @@ val result = orders.addColumns(concat($"c", "Sunny")) Batch Streaming -

Performs a field add operation. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

+

Performs a field add operation. Existing fields will be replaced if the added column name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

{% highlight scala %} val orders = tableEnv.from("Orders"); val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc") @@ -719,7 +719,7 @@ val orders = tableEnv.from("Orders"); val result = orders.renameColumns($"b" as "b2", $"c" as "c2") {% endhighlight %} - +
@@ -783,7 +783,7 @@ result = orders.drop_columns(orders.b, orders.c) 批处理 流处理 -

执行重命名字段操作。参数必须是字段别名(例:b as b2)列表,并且必须是已经存在的字段才能被重命名。

+

执行重命名字段操作。参数必须是字段别名(例:b as b2)列表,并且必须是已经存在的字段才能被重命名。

{% highlight python %} orders = t_env.from_path("Orders") result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2')) @@ -1054,7 +1054,8 @@ orders.groupBy($"users").select($"users", myUdagg.distinct($"points") as "myDist Distinct
- Batch + Batch Streaming
+ Result Updating

Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.

@@ -1062,7 +1063,7 @@ orders.groupBy($"users").select($"users", myUdagg.distinct($"points") as "myDist val orders: Table = tableEnv.from("Orders") val result = orders.distinct() {% endhighlight %} -

Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Query Configuration for details.

+

Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. If state cleaning is enabled, distinct have to emit messages to prevent too early state eviction of downstream operators which makes distinct contains result updating. See Query Configuration for details.

@@ -1085,12 +1086,12 @@ val result = orders.distinct() 结果持续更新 -

类似于SQL的GROUP BY子句。将数据按照指定字段进行分组,之后对各组内数据执行聚合操作。

+

类似于 SQL 的 GROUP BY 子句。将数据按照指定字段进行分组,之后对各组内数据执行聚合操作。

{% highlight python %} orders = t_env.from_path("Orders") result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d')) {% endhighlight %} -

注意: 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于聚合操作的类型和分组的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看查询配置

+

注意: 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于聚合操作的类型和分组的数量。你可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看查询配置

@@ -1117,7 +1118,7 @@ result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w") 流处理 -

类似于SQL中的OVER开窗函数。Over窗口聚合对每一行都进行一次聚合计算,聚合的对象是以当前行的位置为基准,向前向后取一个区间范围内的所有数据。详情请见Over窗口一节。

+

类似于 SQL 中的 OVER 开窗函数。Over 窗口聚合对每一行都进行一次聚合计算,聚合的对象是以当前行的位置为基准,向前向后取一个区间范围内的所有数据。详情请见 Over 窗口一节。

{% highlight python %} from pyflink.table.window import Over from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE @@ -1138,7 +1139,7 @@ result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime) 结果持续更新 -

类似于SQL聚合函数中的的DISTINCT关键字比如COUNT(DISTINCT a)。带有distinct标记的聚合函数只会接受不重复的输入,重复输入将被丢弃。这个去重特性可以在分组聚合(GroupBy Aggregation)分组窗口聚合(GroupBy Window Aggregation)以及Over窗口聚合(Over Window Aggregation)上使用。

+

类似于 SQL 聚合函数中的的 DISTINCT 关键字比如 COUNT(DISTINCT a)。带有 distinct 标记的聚合函数只会接受不重复的输入,重复输入将被丢弃。这个去重特性可以在分组聚合(GroupBy Aggregation)分组窗口聚合(GroupBy Window Aggregation)以及 Over窗口聚合(Over Window Aggregation)上使用。

{% highlight python %} from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE @@ -1158,7 +1159,7 @@ result = orders.over_window(Over .alias("w")) \ .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) {% endhighlight %} -

注意: 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于执行去重判断时参与判断的字段的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看查询配置

+

注意: 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于执行去重判断时参与判断的字段的数量。你可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看查询配置

@@ -1168,12 +1169,12 @@ result = orders.over_window(Over 结果持续更新 -

类似于SQL中的DISTINCT子句。返回去重后的数据。

+

类似于 SQL 中的 DISTINCT 子句。返回去重后的数据。

{% highlight python %} orders = t_env.from_path("Orders") result = orders.distinct() {% endhighlight %} -

注意: 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于执行去重判断时参与判断的字段的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看查询配置

+

注意: 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于执行去重判断时参与判断的字段的数量。你可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看查询配置

@@ -1247,7 +1248,7 @@ Table fullOuterResult = left.fullOuterJoin(right, $("a").isEqual($("d")))

Note: Interval joins are a subset of regular joins that can be processed in a streaming fashion.

-

A interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<, <=, >=, >) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.

+

An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<, <=, >=, >) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.

For example, the following predicates are valid interval join conditions: