From 4cc1da96e88ddc6166181a7f83c891761db283a1 Mon Sep 17 00:00:00 2001 From: Shengkai <33114724+fsk119@users.noreply.github.com> Date: Thu, 26 Nov 2020 09:56:44 +0800 Subject: [PATCH] [FLINK-20314][table-planner] Add rule to remove empty FlinkLogicalCalc --- .../logical/FlinkLogicalCalcRemoveRule.java | 35 +++++++++++++ .../plan/rules/FlinkStreamRuleSets.scala | 4 ++ .../plan/stream/sql/SourceWatermarkTest.xml | 18 +++++++ .../plan/stream/sql/SourceWatermarkTest.scala | 46 +++++++++-------- .../stream/sql/SourceWatermarkITCase.scala | 51 +++++++++++++++++++ 5 files changed, 134 insertions(+), 20 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalCalcRemoveRule.java diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalCalcRemoveRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalCalcRemoveRule.java new file mode 100644 index 0000000000000..0a8974b38d7e1 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalCalcRemoveRule.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; + +import org.apache.calcite.rel.rules.CalcRemoveRule; + +import static org.apache.calcite.rel.rules.CalcRemoveRule.Config; + +/** + * Rule to remove trivial {@link FlinkLogicalCalc}. + */ +public class FlinkLogicalCalcRemoveRule { + public static final CalcRemoveRule INSTANCE = Config.DEFAULT.withOperandSupplier( + b -> b.operand(FlinkLogicalCalc.class) + .predicate(calc -> calc.getProgram().isTrivial()) + .anyInputs()) + .as(Config.class).toRule(); +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index af1ca1b2eaedd..1fc983768a110 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -373,6 +373,10 @@ object FlinkStreamRuleSets { PythonCorrelateSplitRule.INSTANCE, // merge calc after calc transpose FlinkCalcMergeRule.INSTANCE, + // remove the trivial calc that is produced by PushWatermarkIntoTableSourceScanAcrossCalcRule. + // because [[PushWatermarkIntoTableSourceScanAcrossCalcRule]] will push the rowtime computed + // column into the source. After FlinkCalcMergeRule applies, it may produces a trivial calc. + FlinkLogicalCalcRemoveRule.INSTANCE, //Rule that rewrites temporal join with extracted primary key TemporalJoinRewriteWithUniqueKeyRule.INSTANCE, // Rule that splits python ScalarFunctions from java/scala ScalarFunctions. diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml index b9dbf15f7787d..683171d966fb2 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml @@ -16,6 +16,24 @@ See the License for the specific language governing permissions and limitations under the License. --> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala index 7607d2934dfae..c1eaf1ae54350 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.stream.sql import org.apache.flink.table.planner.utils.TableTestBase import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5 -import org.junit.{Before, Ignore, Test} +import org.junit.{Before, Test} /** * Tests for watermark push down. @@ -86,6 +86,31 @@ class SourceWatermarkTest extends TableTestBase { | ) |""".stripMargin util.tableEnv.executeSql(ddl3) + + val ddl4 = + """ + | CREATE TABLE MyTable( + | a INT, + | b BIGINT, + | c TIMESTAMP(3), + | originTime BIGINT METADATA, + | rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(originTime/1000), 'yyyy-MM-dd HH:mm:ss'), + | WATERMARK FOR rowtime AS rowtime + | ) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'bounded' = 'false', + | 'disable-lookup' = 'true', + | 'readable-metadata' = 'originTime:BIGINT' + | ) + |""".stripMargin + + util.tableEnv.executeSql(ddl4) + } + + @Test + def testSimpleWatermarkPushDown(): Unit = { + util.verifyPlan("SELECT a, b, c FROM VirtualTable") } @Test @@ -110,25 +135,6 @@ class SourceWatermarkTest extends TableTestBase { @Test def testWatermarkWithMetadata(): Unit = { - val ddl = - """ - | CREATE TABLE MyTable( - | a INT, - | b BIGINT, - | c TIMESTAMP(3), - | originTime BIGINT METADATA, - | rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(originTime/1000), 'yyyy-MM-dd HH:mm:ss'), - | WATERMARK FOR rowtime AS rowtime - | ) WITH ( - | 'connector' = 'values', - | 'enable-watermark-push-down' = 'true', - | 'bounded' = 'false', - | 'disable-lookup' = 'true', - | 'readable-metadata' = 'originTime:BIGINT' - | ) - |""".stripMargin - - util.tableEnv.executeSql(ddl) util.verifyPlan("SELECT a, b FROM MyTable") } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala index e4f674a0f6537..95b7adbc40bfa 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala @@ -36,6 +36,57 @@ import java.time.LocalDateTime import scala.collection.JavaConverters._ class SourceWatermarkITCase extends StreamingTestBase{ + @Test + def testSimpleWatermarkPushDown(): Unit = { + val data = Seq( + row(1, 2L, LocalDateTime.parse("2020-11-21T19:00:05.23")), + row(2, 3L, LocalDateTime.parse("2020-11-21T21:00:05.23")) + ) + + val dataId = TestValuesTableFactory.registerData(data) + + val ddl = + s""" + | CREATE Table VirtualTable ( + | a INT, + | b BIGINT, + | c TIMESTAMP(3), + | d as c - INTERVAL '5' second, + | WATERMARK FOR d as d + INTERVAL '5' second + | ) with ( + | 'connector' = 'values', + | 'bounded' = 'false', + | 'enable-watermark-push-down' = 'true', + | 'disable-lookup' = 'true', + | 'data-id' = '$dataId' + | ) + |""".stripMargin + + tEnv.executeSql(ddl) + + val expectedWatermarkOutput = Seq( + "2020-11-21T19:00:05.230", + "2020-11-21T21:00:05.230") + val expectedData = Seq( + "1,2,2020-11-21T19:00:05.230", + "2,3,2020-11-21T21:00:05.230" + ) + + val query = "SELECT a, b, c FROM VirtualTable" + val result = tEnv.sqlQuery(query).toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val actualWatermark = TestValuesTableFactory.getWatermarkOutput("VirtualTable") + .asScala + .map(x => TimestampData.fromEpochMillis(x.getTimestamp).toLocalDateTime.toString) + .toList + + assertEquals(expectedWatermarkOutput, actualWatermark) + assertEquals(expectedData.sorted, sink.getAppendResults.sorted) + } + @Test def testWatermarkWithNestedRow(): Unit = { val data = Seq(