Skip to content

Commit

Permalink
[FLINK-29156][table] Support LISTAGG in the Table API
Browse files Browse the repository at this point in the history
This closes apache#20742.
  • Loading branch information
cun8cun8 authored and dianfu committed Oct 3, 2022
1 parent f543b8a commit 397141e
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ aggregate:
table: FIELD.lastValue
description: Returns the last value in an ordered set of values.
- sql: LISTAGG(expression [, separator])
table: FIELD.listagg
description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','.
- sql: CUME_DIST()
description: Return the cumulative distribution of a value in a group of values. The result is the number of rows preceding or equal to the current row in the ordering of the partition divided by the number of rows in the window partition.
Expand Down
1 change: 1 addition & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@ aggregate:
table: FIELD.lastValue
description: 返回一组有序值中的最后一个值。
- sql: LISTAGG(expression [, separator])
table: FIELD.listagg
description: 连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。
- sql: CUME_DIST()
description: 返回值在一组值的累积分布。结果是小于或等于当前行的值的行数除以窗口分区的总行数。
Expand Down
10 changes: 10 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,16 @@ def last_value(self) -> 'Expression':
"""
return _unary_op("lastValue")(self)

def list_agg(self, separator: Union[str, 'Expression[str]'] = None) -> 'Expression[str]':
"""
Concatenates the values of string expressions and places separator values between them.
The separator is not added at the end of string. The default value of separator is ‘,’.
"""
if separator is None:
return _unary_op("listAgg")(self)
else:
return _binary_op("listAgg")(self, separator)

@property
def stddev_pop(self) -> 'Expression':
return _unary_op("stddevPop")(self)
Expand Down
1 change: 1 addition & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def test_expression(self):
self.assertEqual('avg(a)', str(expr1.avg))
self.assertEqual('first_value(a)', str(expr1.first_value))
self.assertEqual('last_value(a)', str(expr1.last_value))
self.assertEqual("listAgg(a, ',')", str(expr1.list_agg(",")))
self.assertEqual('stddevPop(a)', str(expr1.stddev_pop))
self.assertEqual('stddevSamp(a)', str(expr1.stddev_samp))
self.assertEqual('varPop(a)', str(expr1.var_pop))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LESS_THAN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LIKE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LISTAGG;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOCATE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOG;
Expand Down Expand Up @@ -471,6 +472,24 @@ public OutType lastValue() {
return toApiSpecificExpression(unresolvedCall(LAST_VALUE, toExpr()));
}

/**
* Concatenates the values of string expressions and places separator(,) values between them.
* The separator is not added at the end of string.
*/
public OutType listAgg() {
return toApiSpecificExpression(unresolvedCall(LISTAGG, toExpr(), valueLiteral(",")));
}

/**
* Concatenates the values of string expressions and places separator values between them. The
* separator is not added at the end of string. The default value of separator is ‘,’.
*
* @param separator string containing the character
*/
public OutType listAgg(String separator) {
return toApiSpecificExpression(unresolvedCall(LISTAGG, toExpr(), valueLiteral(separator)));
}

/** Returns the population standard deviation of an expression (the square root of varPop()). */
public OutType stddevPop() {
return toApiSpecificExpression(unresolvedCall(STDDEV_POP, toExpr()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(TypeStrategies.aggArg0(t -> t, true))
.build();

public static final BuiltInFunctionDefinition LISTAGG =
BuiltInFunctionDefinition.newBuilder()
.name("listAgg")
.kind(AGGREGATE)
.inputTypeStrategy(sequence(ANY, logical(LogicalTypeFamily.CHARACTER_STRING)))
.outputTypeStrategy(explicit(STRING().nullable()))
.build();

public static final BuiltInFunctionDefinition SUM =
BuiltInFunctionDefinition.newBuilder()
.name("sum")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class SqlAggFunctionVisitor extends ExpressionDefaultVisitor<SqlAggFuncti
BuiltInFunctionDefinitions.FIRST_VALUE, FlinkSqlOperatorTable.FIRST_VALUE);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.LAST_VALUE, FlinkSqlOperatorTable.LAST_VALUE);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.LISTAGG, FlinkSqlOperatorTable.LISTAGG);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
BuiltInFunctionDefinitions.SUM0, FlinkSqlOperatorTable.SUM0);
AGG_DEF_SQL_OPERATOR_MAPPING.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,29 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase
val t = failingDataSource(tupleData5)
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
.groupBy('e, 'b % 3)
.select('c.min, 'e, 'a.avg, 'd.count, 'b.firstValue(), call("LAST_VALUE", col("c")))
.select(
'c.min,
'e,
'a.avg,
'd.count,
'b.firstValue(),
call("LAST_VALUE", col("c")),
'd.listAgg("-"))

val sink = new TestingRetractSink()
t.toRetractStream[Row].addSink(sink)
env.execute()

val expected = mutable.MutableList(
s"0,1,1,1,1,0",
s"7,1,4,2,8,10",
s"2,1,3,2,3,8",
s"3,2,3,3,4,9",
s"1,2,3,3,2,13",
s"14,2,5,1,15,14",
s"12,3,5,1,13,12",
s"5,3,4,2,6,11")
s"0,1,1,1,1,0,Hallo",
s"1,2,3,3,2,13,Hallo Welt-ABC-JKL",
s"12,3,5,1,13,12,IJK",
s"14,2,5,1,15,14,KLM",
s"2,1,3,2,3,8,Hallo Welt wie-EFG",
s"3,2,3,3,4,9,Hallo Welt wie gehts?-CDE-FGH",
s"5,3,4,2,6,11,BCD-HIJ",
s"7,1,4,2,8,10,DEF-GHI"
)
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}

Expand Down

0 comments on commit 397141e

Please sign in to comment.