Skip to content

Commit

Permalink
[FLINK-20314][table-planner] Add rule to remove empty FlinkLogicalCalc
Browse files Browse the repository at this point in the history
  • Loading branch information
fsk119 committed Nov 26, 2020
1 parent 74fd8b7 commit 4cc1da9
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;

import org.apache.calcite.rel.rules.CalcRemoveRule;

import static org.apache.calcite.rel.rules.CalcRemoveRule.Config;

/**
* Rule to remove trivial {@link FlinkLogicalCalc}.
*/
public class FlinkLogicalCalcRemoveRule {
public static final CalcRemoveRule INSTANCE = Config.DEFAULT.withOperandSupplier(
b -> b.operand(FlinkLogicalCalc.class)
.predicate(calc -> calc.getProgram().isTrivial())
.anyInputs())
.as(Config.class).toRule();
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ object FlinkStreamRuleSets {
PythonCorrelateSplitRule.INSTANCE,
// merge calc after calc transpose
FlinkCalcMergeRule.INSTANCE,
// remove the trivial calc that is produced by PushWatermarkIntoTableSourceScanAcrossCalcRule.
// because [[PushWatermarkIntoTableSourceScanAcrossCalcRule]] will push the rowtime computed
// column into the source. After FlinkCalcMergeRule applies, it may produces a trivial calc.
FlinkLogicalCalcRemoveRule.INSTANCE,
//Rule that rewrites temporal join with extracted primary key
TemporalJoinRewriteWithUniqueKeyRule.INSTANCE,
// Rule that splits python ScalarFunctions from java/scala ScalarFunctions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,24 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
<TestCase name="testSimpleWatermarkPushDown">
<Resource name="sql">
<![CDATA[SELECT a, b, c FROM VirtualTable]]>
</Resource>
<Resource name="planBefore">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, VirtualTable]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
TableSourceScan(table=[[default_catalog, default_database, VirtualTable, watermark=[-(+($2, 5000:INTERVAL SECOND), 5000:INTERVAL SECOND)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testWatermarkWithUdf">
<Resource name="sql">
<![CDATA[SELECT a - b FROM UdfTable]]>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.stream.sql
import org.apache.flink.table.planner.utils.TableTestBase
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5

import org.junit.{Before, Ignore, Test}
import org.junit.{Before, Test}

/**
* Tests for watermark push down.
Expand Down Expand Up @@ -86,6 +86,31 @@ class SourceWatermarkTest extends TableTestBase {
| )
|""".stripMargin
util.tableEnv.executeSql(ddl3)

val ddl4 =
"""
| CREATE TABLE MyTable(
| a INT,
| b BIGINT,
| c TIMESTAMP(3),
| originTime BIGINT METADATA,
| rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(originTime/1000), 'yyyy-MM-dd HH:mm:ss'),
| WATERMARK FOR rowtime AS rowtime
| ) WITH (
| 'connector' = 'values',
| 'enable-watermark-push-down' = 'true',
| 'bounded' = 'false',
| 'disable-lookup' = 'true',
| 'readable-metadata' = 'originTime:BIGINT'
| )
|""".stripMargin

util.tableEnv.executeSql(ddl4)
}

@Test
def testSimpleWatermarkPushDown(): Unit = {
util.verifyPlan("SELECT a, b, c FROM VirtualTable")
}

@Test
Expand All @@ -110,25 +135,6 @@ class SourceWatermarkTest extends TableTestBase {

@Test
def testWatermarkWithMetadata(): Unit = {
val ddl =
"""
| CREATE TABLE MyTable(
| a INT,
| b BIGINT,
| c TIMESTAMP(3),
| originTime BIGINT METADATA,
| rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(originTime/1000), 'yyyy-MM-dd HH:mm:ss'),
| WATERMARK FOR rowtime AS rowtime
| ) WITH (
| 'connector' = 'values',
| 'enable-watermark-push-down' = 'true',
| 'bounded' = 'false',
| 'disable-lookup' = 'true',
| 'readable-metadata' = 'originTime:BIGINT'
| )
|""".stripMargin

util.tableEnv.executeSql(ddl)
util.verifyPlan("SELECT a, b FROM MyTable")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,57 @@ import java.time.LocalDateTime
import scala.collection.JavaConverters._

class SourceWatermarkITCase extends StreamingTestBase{
@Test
def testSimpleWatermarkPushDown(): Unit = {
val data = Seq(
row(1, 2L, LocalDateTime.parse("2020-11-21T19:00:05.23")),
row(2, 3L, LocalDateTime.parse("2020-11-21T21:00:05.23"))
)

val dataId = TestValuesTableFactory.registerData(data)

val ddl =
s"""
| CREATE Table VirtualTable (
| a INT,
| b BIGINT,
| c TIMESTAMP(3),
| d as c - INTERVAL '5' second,
| WATERMARK FOR d as d + INTERVAL '5' second
| ) with (
| 'connector' = 'values',
| 'bounded' = 'false',
| 'enable-watermark-push-down' = 'true',
| 'disable-lookup' = 'true',
| 'data-id' = '$dataId'
| )
|""".stripMargin

tEnv.executeSql(ddl)

val expectedWatermarkOutput = Seq(
"2020-11-21T19:00:05.230",
"2020-11-21T21:00:05.230")
val expectedData = Seq(
"1,2,2020-11-21T19:00:05.230",
"2,3,2020-11-21T21:00:05.230"
)

val query = "SELECT a, b, c FROM VirtualTable"
val result = tEnv.sqlQuery(query).toAppendStream[Row]
val sink = new TestingAppendSink
result.addSink(sink)
env.execute()

val actualWatermark = TestValuesTableFactory.getWatermarkOutput("VirtualTable")
.asScala
.map(x => TimestampData.fromEpochMillis(x.getTimestamp).toLocalDateTime.toString)
.toList

assertEquals(expectedWatermarkOutput, actualWatermark)
assertEquals(expectedData.sorted, sink.getAppendResults.sorted)
}

@Test
def testWatermarkWithNestedRow(): Unit = {
val data = Seq(
Expand Down

0 comments on commit 4cc1da9

Please sign in to comment.