Skip to content

Commit

Permalink
[FLINK-13076] [table-planner-blink] Support true condition on lookup …
Browse files Browse the repository at this point in the history
…join condition

This closes apache#8962
  • Loading branch information
godfreyhe authored and KurtYoung committed Jul 11, 2019
1 parent 42b04b0 commit 9df0a03
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ object FlinkBatchRuleSets {
* can create new plan nodes.
*/
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
LogicalCorrelateToJoinFromTemporalTableRule.INSTANCE,
LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
TableScanRule.INSTANCE)

val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ object FlinkStreamRuleSets {
* can create new plan nodes.
*/
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
LogicalCorrelateToJoinFromTemporalTableRule.INSTANCE,
LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE,
TableScanRule.INSTANCE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import org.apache.flink.table.expressions.{FieldReferenceExpression, _}
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.functions.{TemporalTableFunction, TemporalTableFunctionImpl}
import org.apache.flink.table.operations.QueryOperation
import org.apache.flink.table.plan.util.{ExpandTableScanShuttle, RexDefaultVisitor}
import org.apache.flink.table.plan.util.TemporalJoinUtil.{makeProcTimeTemporalJoinConditionCall, makeRowTimeTemporalJoinConditionCall}
import org.apache.flink.table.plan.util.{ExpandTableScanShuttle, RexDefaultVisitor}
import org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasRoot, isProctimeAttribute}
import org.apache.flink.util.Preconditions.checkState

import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
Expand All @@ -45,8 +46,7 @@ import org.apache.calcite.rex._
class LogicalCorrelateToJoinFromTemporalTableFunctionRule
extends RelOptRule(
operand(classOf[LogicalCorrelate],
some(
operand(classOf[RelNode], any()),
some(operand(classOf[RelNode], any()),
operand(classOf[TableFunctionScan], none()))),
"LogicalCorrelateToJoinFromTemporalTableFunctionRule") {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.table.plan.rules.logical

import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalSnapshot}
import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, RexNode, RexShuttle}
Expand All @@ -29,26 +29,24 @@ import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, R
* [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecLookupJoin]] in physical and
* might be translated into
* [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalJoin]] in the future.
*
* TODO supports `true` join condition
*/
class LogicalCorrelateToJoinFromTemporalTableRule
extends RelOptRule(
operand(classOf[LogicalCorrelate],
operand(classOf[RelNode], any()),
operand(classOf[LogicalFilter],
operand(classOf[LogicalSnapshot], any()))),
"LogicalCorrelateToJoinFromTemporalTableRule") {
abstract class LogicalCorrelateToJoinFromTemporalTableRule(
operand: RelOptRuleOperand,
description: String)
extends RelOptRule(operand, description) {

def getFilterCondition(call: RelOptRuleCall): RexNode

def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot

override def onMatch(call: RelOptRuleCall): Unit = {
val correlate: LogicalCorrelate = call.rel(0)
val leftInput: RelNode = call.rel(1)
val filter: LogicalFilter = call.rel(2)
val snapshot: LogicalSnapshot = call.rel[LogicalSnapshot](3)
val filterCondition = getFilterCondition(call)
val snapshot = getLogicalSnapshot(call)

val leftRowType = leftInput.getRowType
val condition = filter.getCondition
val joinCondition = condition.accept(new RexShuttle() {
val joinCondition = filterCondition.accept(new RexShuttle() {
// change correlate variable expression to normal RexInputRef (which is from left side)
override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
fieldAccess.getReferenceExpr match {
Expand Down Expand Up @@ -78,6 +76,54 @@ class LogicalCorrelateToJoinFromTemporalTableRule

}

/**
* Planner rule that matches temporal table join which join condition is not true,
* that means the right input of the Correlate is a Filter.
* e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
* ON T.a = D.id
*/
class LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
extends LogicalCorrelateToJoinFromTemporalTableRule(
operand(classOf[LogicalCorrelate],
operand(classOf[RelNode], any()),
operand(classOf[LogicalFilter],
operand(classOf[LogicalSnapshot], any()))),
"LogicalCorrelateToJoinFromTemporalTableRuleWithFilter"
) {

override def getFilterCondition(call: RelOptRuleCall): RexNode = {
val filter: LogicalFilter = call.rel(2)
filter.getCondition
}

override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
call.rels(3).asInstanceOf[LogicalSnapshot]
}
}

/**
* Planner rule that matches temporal table join which join condition is true,
* that means the right input of the Correlate is a Snapshot.
* e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D ON true
*/
class LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter
extends LogicalCorrelateToJoinFromTemporalTableRule(
operand(classOf[LogicalCorrelate],
operand(classOf[RelNode], any()),
operand(classOf[LogicalSnapshot], any())),
"LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter"
) {

override def getFilterCondition(call: RelOptRuleCall): RexNode = {
call.builder().literal(true)
}

override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
call.rels(2).asInstanceOf[LogicalSnapshot]
}
}

object LogicalCorrelateToJoinFromTemporalTableRule {
val INSTANCE = new LogicalCorrelateToJoinFromTemporalTableRule
val WITH_FILTER = new LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
val WITHOUT_FILTER = new LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,34 @@ FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=
+- FlinkLogicalSnapshot(period=[$cor0.proctime])
+- FlinkLogicalCalc(select=[id], where=[>(age, 10)])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]], fields=[id, name, age])
]]>
</Resource>
</TestCase>
<TestCase name="testJoinTemporalTableWithTrueCondition">
<Resource name="sql">
<![CDATA[
SELECT * FROM MyTable AS T
JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
ON true
WHERE T.c > 1000
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
+- LogicalFilter(condition=[>($2, 1000)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
:- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
: +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[], select=[a, b, c, proctime, id, name, age])
+- Calc(select=[a, b, c, PROCTIME() AS proctime], where=[>(c, 1000)])
+- BoundedStreamScan(table=[[default_catalog, default_database, T0]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,34 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[a=id], select=[a, b, c, proctime, rowtime, id, name, age])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testJoinTemporalTableWithTrueCondition">
<Resource name="sql">
<![CDATA[
SELECT * FROM MyTable AS T
JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
ON true
WHERE T.c > 1000
]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
+- LogicalFilter(condition=[>($2, 1000)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
+- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[], select=[a, b, c, proctime, rowtime, id, name, age])
+- Calc(select=[a, b, c, proctime, rowtime], where=[>(c, 1000)])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,18 @@ class LookupJoinTest extends TableTestBase {
testUtil.verifyPlan(sql)
}

@Test
def testJoinTemporalTableWithTrueCondition(): Unit = {
val sql =
"""
|SELECT * FROM MyTable AS T
|JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
|ON true
|WHERE T.c > 1000
""".stripMargin
testUtil.verifyPlan(sql)
}

@Test
def testReusing(): Unit = {
testUtil.tableEnv.getConfig.getConf.setBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,19 @@ class LookupJoinTest extends TableTestBase with Serializable {
streamUtil.verifyPlan(sql)
}

@Test
def testJoinTemporalTableWithTrueCondition(): Unit = {
val sql =
"""
|SELECT * FROM MyTable AS T
|JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
|ON true
|WHERE T.c > 1000
""".stripMargin

streamUtil.verifyPlan(sql)
}

// ==========================================================================================

private def expectExceptionThrown(
Expand Down

0 comments on commit 9df0a03

Please sign in to comment.