From 7fba616857a6a71c6a818c008e59af94bc933375 Mon Sep 17 00:00:00 2001 From: hequn8128 Date: Wed, 17 Apr 2019 12:29:10 +0800 Subject: [PATCH] [FLINK-12217][table] OperationTreeBuilder.map() should perform ExpressionResolver.resolve() --- .../table/operations/OperationTreeBuilder.scala | 7 +++++-- .../table/stringexpr/CalcStringExpressionTest.scala | 13 +++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala index 1579eb45c9381..c6fec0198310b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala @@ -348,12 +348,15 @@ class OperationTreeBuilder(private val tableEnv: TableEnvironment) { def map(mapFunction: Expression, child: TableOperation): TableOperation = { - if (!isScalarFunction(mapFunction)) { + val resolver = resolverFor(tableCatalog, functionCatalog, child).build() + val resolvedMapFunction = resolveSingleExpression(mapFunction, resolver) + + if (!isScalarFunction(resolvedMapFunction)) { throw new ValidationException("Only ScalarFunction can be used in the map operator.") } val expandedFields = new CallExpression(BuiltInFunctionDefinitions.FLATTEN, - List(mapFunction).asJava) + List(resolvedMapFunction).asJava) project(Collections.singletonList(expandedFields), child) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CalcStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CalcStringExpressionTest.scala index 0fd5528354665..999c81ba8f396 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CalcStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CalcStringExpressionTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.Literal +import org.apache.flink.table.expressions.utils.Func23 import org.apache.flink.table.utils.TableTestBase import org.junit.Test @@ -167,4 +168,16 @@ class CalcStringExpressionTest extends TableTestBase { verifyTableEquals(t1, t2) } + + @Test + def testMap(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c) + util.tableEnv.registerFunction("func", Func23) + + val t1 = t.map("func(a, b, c)") + val t2 = t.map(Func23('a, 'b, 'c)) + + verifyTableEquals(t1, t2) + } }