Skip to content

Commit

Permalink
[FLINK-7854] [table] Reject lateral table outer joins with predicates…
Browse files Browse the repository at this point in the history
… in SQL.

This closes apache#4846.
  • Loading branch information
xccui authored and fhueske committed Oct 19, 2017
1 parent 808e0f9 commit 479be9d
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 4 deletions.
3 changes: 2 additions & 1 deletion docs/dev/table/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,12 @@ FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
</tr>
<tr>
<td>
<strong>User Defined Table Functions (UDTF)</strong><br>
<strong>Join with User Defined Table Functions (UDTF)</strong><br>
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
<p>UDTFs must be registered in the TableEnvironment. See the <a href="udfs.html">UDF documentation</a> for details on how to specify and register UDTFs. </p>
<p><b>Note:</b> Currently only literal <code>TRUE</code> can be accepted as the predicate for the left outer join against a lateral table.</p>
{% highlight sql %}
SELECT users, tag
FROM Orders LATERAL VIEW UNNEST_UDTF(tags) t AS tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.calcite.adapter.java.JavaTypeFactory
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql._
import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl}
import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl, SqlValidatorScope}
import org.apache.flink.table.api.ValidationException

/**
* This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
Expand All @@ -48,4 +49,30 @@ class FlinkCalciteSqlValidator(
insert: SqlInsert): RelDataType = {
typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
}

override def validateJoin(join: SqlJoin, scope: SqlValidatorScope): Unit = {
// Due to the improper translation of lateral table left outer join in Calcite, we need to
// temporarily forbid the common predicates until the problem is fixed (see FLINK-7865).
if (join.getJoinType == JoinType.LEFT &&
isCollectionTable(join.getRight)) {
join.getCondition match {
case c: SqlLiteral if c.booleanValue() && c.getValue.asInstanceOf[Boolean] =>
// We accept only literal true
case c if null != c =>
throw new ValidationException(
s"Left outer joins with a table function do not accept a predicte such as $c. " +
s"Only literal TRUE is accepted.")
}
}
super.validateJoin(join, scope)
}

private def isCollectionTable(node: SqlNode): Boolean = {
// TABLE (`func`(`foo`)) AS bar
node match {
case n: SqlCall if n.getKind == SqlKind.AS =>
n.getOperandList.get(0).getKind == SqlKind.COLLECTION_TABLE
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class CorrelateTest extends TableTestBase {
}

@Test
def testLeftOuterJoin(): Unit = {
def testLeftOuterJoinWithLiteralTrue(): Unit = {
val util = batchTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
Expand All @@ -102,6 +102,46 @@ class CorrelateTest extends TableTestBase {
util.verifySql(sqlQuery, expected)
}

@Test
def testLeftOuterJoinAsSubQuery(): Unit = {
val util = batchTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2)
util.addFunction("func1", func1)

val sqlQuery =
"""
| SELECT *
| FROM MyTable2 LEFT OUTER JOIN
| (SELECT c, s
| FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true)
| ON c2 = s """.stripMargin

val expected = binaryNode(
"DataSetJoin",
batchTableNode(1),
unaryNode(
"DataSetCalc",
unaryNode(
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func1($cor0.c)"),
term("correlate", "table(func1($cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType","LEFT")
),
term("select", "c", "f0 AS s")
),
term("where", "=(c2, s)"),
term("join", "a2", "b2", "c2", "c", "s"),
term("joinType", "LeftOuterJoin")
)

util.verifySql(sqlQuery, expected)
}

@Test
def testCustomType(): Unit = {
val util = batchTestUtil()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api.batch.sql.validation

import org.apache.flink.api.scala._
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.utils.{TableFunc1, TableTestBase}
import org.junit.Test

class CorrelateValidationTest extends TableTestBase{

/**
* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the
* join predicate can only be empty or literal true (the restriction should be removed in
* FLINK-7865).
*/
@Test(expected = classOf[ValidationException])
def testLeftOuterJoinWithPredicates(): Unit = {
val util = batchTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.addFunction("func1", func1)

val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON c = s"

util.verifySql(sqlQuery, "n/a")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class CorrelateTest extends TableTestBase {
}

@Test
def testLeftOuterJoin(): Unit = {
def testLeftOuterJoinWithLiteralTrue(): Unit = {
val util = streamTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
Expand All @@ -102,6 +102,46 @@ class CorrelateTest extends TableTestBase {
util.verifySql(sqlQuery, expected)
}

@Test
def testLeftOuterJoinAsSubQuery(): Unit = {
val util = batchTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2)
util.addFunction("func1", func1)

val sqlQuery =
"""
| SELECT *
| FROM MyTable2 LEFT OUTER JOIN
| (SELECT c, s
| FROM MyTable LEFT OUTER JOIN LATERAL TABLE(func1(c)) AS T(s) on true)
| ON c2 = s """.stripMargin

val expected = binaryNode(
"DataSetJoin",
batchTableNode(1),
unaryNode(
"DataSetCalc",
unaryNode(
"DataSetCorrelate",
batchTableNode(0),
term("invocation", "func1($cor0.c)"),
term("correlate", "table(func1($cor0.c))"),
term("select", "a", "b", "c", "f0"),
term("rowType", "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) f0)"),
term("joinType","LEFT")
),
term("select", "c", "f0 AS s")
),
term("where", "=(c2, s)"),
term("join", "a2", "b2", "c2", "c", "s"),
term("joinType", "LeftOuterJoin")
)

util.verifySql(sqlQuery, expected)
}

@Test
def testCustomType(): Unit = {
val util = streamTestUtil()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api.stream.sql.validation

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.utils.{TableFunc1, TableTestBase}
import org.junit.Test

class CorrelateValidationTest extends TableTestBase{

/**
* Due to the improper translation of TableFunction left outer join (see CALCITE-2004), the
* join predicate can only be empty or literal true (the restriction should be removed in
* FLINK-7865).
*/
@Test(expected = classOf[ValidationException])
def testLeftOuterJoinWithPredicates(): Unit = {
val util = streamTestUtil()
val func1 = new TableFunc1
util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.addFunction("func1", func1)

val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON c = s"

util.verifySql(sqlQuery, "n/a")
}
}

0 comments on commit 479be9d

Please sign in to comment.