Skip to content

Commit

Permalink
[FLINK-12217][table] OperationTreeBuilder.map() should perform Expres…
Browse files Browse the repository at this point in the history
…sionResolver.resolve()
  • Loading branch information
hequn8128 authored and dawidwys committed Apr 17, 2019
1 parent 9d0951e commit 7fba616
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,15 @@ class OperationTreeBuilder(private val tableEnv: TableEnvironment) {

def map(mapFunction: Expression, child: TableOperation): TableOperation = {

if (!isScalarFunction(mapFunction)) {
val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
val resolvedMapFunction = resolveSingleExpression(mapFunction, resolver)

if (!isScalarFunction(resolvedMapFunction)) {
throw new ValidationException("Only ScalarFunction can be used in the map operator.")
}

val expandedFields = new CallExpression(BuiltInFunctionDefinitions.FLATTEN,
List(mapFunction).asJava)
List(resolvedMapFunction).asJava)
project(Collections.singletonList(expandedFields), child)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.flink.table.api.stream.table.stringexpr
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.Literal
import org.apache.flink.table.expressions.utils.Func23
import org.apache.flink.table.utils.TableTestBase
import org.junit.Test

Expand Down Expand Up @@ -167,4 +168,16 @@ class CalcStringExpressionTest extends TableTestBase {

verifyTableEquals(t1, t2)
}

@Test
def testMap(): Unit = {
val util = streamTestUtil()
val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
util.tableEnv.registerFunction("func", Func23)

val t1 = t.map("func(a, b, c)")
val t2 = t.map(Func23('a, 'b, 'c))

verifyTableEquals(t1, t2)
}
}

0 comments on commit 7fba616

Please sign in to comment.