Skip to content

Commit

Permalink
[FLINK-17868][table-planner-blink] Fix proctime in DDL can not work i…
Browse files Browse the repository at this point in the history
…n batch mode

This closes apache#14687
  • Loading branch information
LadyForest authored Jan 22, 2021
1 parent 5b477d3 commit c5bb490
Show file tree
Hide file tree
Showing 21 changed files with 255 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc;
import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.types.logical.RowType;

import org.apache.calcite.rex.RexProgram;
Expand All @@ -32,12 +32,6 @@ public class StreamExecCalc extends CommonExecCalc implements StreamExecNode<Row

public StreamExecCalc(
RexProgram calcProgram, ExecEdge inputEdge, RowType outputType, String description) {
super(
calcProgram,
AbstractProcessStreamOperator.class,
true,
inputEdge,
outputType,
description);
super(calcProgram, TableStreamOperator.class, true, inputEdge, outputType, description);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate;
import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.types.logical.RowType;

Expand All @@ -47,7 +47,7 @@ public StreamExecCorrelate(
joinType,
invocation,
condition,
AbstractProcessStreamOperator.class,
TableStreamOperator.class,
true,
inputEdge,
outputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.utils.ScanUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -96,7 +96,7 @@ protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
}
CodeGeneratorContext ctx =
new CodeGeneratorContext(planner.getTableConfig())
.setOperatorBaseClass(AbstractProcessStreamOperator.class);
.setOperatorBaseClass(TableStreamOperator.class);
transformation =
ScanUtil.convertToInternalRow(
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.flink.table.planner.plan.utils.ScanUtil;
import org.apache.flink.table.planner.sources.TableSourceUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
Expand Down Expand Up @@ -97,7 +97,7 @@ protected Transformation<RowData> createConversionTransformationIfNeeded(

CodeGeneratorContext ctx =
new CodeGeneratorContext(planner.getTableConfig())
.setOperatorBaseClass(AbstractProcessStreamOperator.class);
.setOperatorBaseClass(TableStreamOperator.class);
// the produced type may not carry the correct precision user defined in DDL, because
// it may be converted from legacy type. Fix precision using logical schema from DDL.
// Code generation requires the correct precision of input fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils
import org.apache.flink.util.Preconditions

import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex.RexBuilder

import java.util.Collections

Expand Down Expand Up @@ -92,6 +93,10 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)

override def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory =
context.getSqlExprToRexConverterFactory

override def getRexBuilder: RexBuilder = planner.getRelBuilder.getRexBuilder

override def needFinalTimeIndicatorConversion: Boolean = true
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ object FlinkBatchProgram {
val SUBQUERY_REWRITE = "subquery_rewrite"
val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"
val DECORRELATE = "decorrelate"
val TIME_INDICATOR = "time_indicator"
val DEFAULT_REWRITE = "default_rewrite"
val PREDICATE_PUSHDOWN = "predicate_pushdown"
val JOIN_REORDER = "join_reorder"
Expand Down Expand Up @@ -96,6 +97,9 @@ object FlinkBatchProgram {
// query decorrelation
chainedProgram.addLast(DECORRELATE, new FlinkDecorrelateProgram)

// convert time indicators
chainedProgram.addLast(TIME_INDICATOR, new FlinkRelTimeIndicatorProgram)

// default rewrite, includes: predicate simplification, expression reduction, etc.
chainedProgram.addLast(
DEFAULT_REWRITE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,22 @@

package org.apache.flink.table.planner.plan.optimize.program

import org.apache.calcite.rex.RexBuilder
import org.apache.flink.table.planner.calcite.FlinkContext

/**
* A FlinkOptimizeContext allows to obtain table environment information when optimizing.
*/
trait FlinkOptimizeContext extends FlinkContext {
/**
* Gets the Calcite [[RexBuilder]] defined in [[org.apache.flink.table.api.TableEnvironment]].
*/
def getRexBuilder: RexBuilder

/**
* Returns true if the output node needs final TimeIndicator conversion
* defined in [[org.apache.flink.table.api.TableEnvironment#optimize]].
*/
def needFinalTimeIndicatorConversion: Boolean

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import org.apache.calcite.rel.RelNode

/**
* A FlinkOptimizeProgram that deals with time.
*
* @tparam OC OptimizeContext
*/
class FlinkRelTimeIndicatorProgram extends FlinkOptimizeProgram[StreamOptimizeContext] {
class FlinkRelTimeIndicatorProgram[OC <: FlinkOptimizeContext] extends FlinkOptimizeProgram[OC] {

override def optimize(input: RelNode, context: StreamOptimizeContext): RelNode = {
override def optimize(input: RelNode, context: OC): RelNode = {
val rexBuilder = Preconditions.checkNotNull(context.getRexBuilder)
RelTimeIndicatorConverter.convert(input, rexBuilder, context.needFinalTimeIndicatorConversion)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ import org.apache.calcite.rex.RexBuilder
*/
trait StreamOptimizeContext extends FlinkOptimizeContext {

/**
* Gets the Calcite [[RexBuilder]] defined in [[org.apache.flink.table.api.TableEnvironment]].
*/
def getRexBuilder: RexBuilder

/**
* Returns true if the root is required to send UPDATE_BEFORE message with
* UPDATE_AFTER message together for update changes.
Expand All @@ -43,10 +38,4 @@ trait StreamOptimizeContext extends FlinkOptimizeContext {
*/
def getMiniBatchInterval: MiniBatchInterval

/**
* Returns true if the output node needs final TimeIndicator conversion
* defined in [[org.apache.flink.table.api.TableEnvironment#optimize]].
*/
def needFinalTimeIndicatorConversion: Boolean

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLog
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeName

import _root_.java.math.{BigDecimal => JBigDecimal}

Expand All @@ -48,9 +49,13 @@ class BatchLogicalWindowAggregateRule
override private[table] def getOutAggregateGroupExpression(
rexBuilder: RexBuilder,
windowExpression: RexCall): RexNode = {

val literalType = windowExpression.getOperands.get(0).getType
rexBuilder.makeZeroLiteral(literalType)
// Create a literal with normal SqlTypeName.TIMESTAMP
// in case we reference a rowtime field.
rexBuilder.makeLiteral(
0L,
rexBuilder.getTypeFactory.createSqlType(
SqlTypeName.TIMESTAMP, windowExpression.getType.getPrecision),
true)
}

private[table] override def getTimeFieldReference(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@ LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)
<![CDATA[
Calc(select=[a, b, (a + 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e])
+- TableSourceScan(table=[[default_catalog, default_database, c_watermark_t]], fields=[a, b])
]]>
</Resource>
</TestCase>
<TestCase name="testDDLWithProctime">
<Resource name="sql">
<![CDATA[SELECT * FROM proctime_t]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[+($0, 1)], d=[TO_TIMESTAMP($1)], e=[my_udf($0)], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, proctime_t]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b, (a + 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e, PROCTIME_MATERIALIZE(PROCTIME()) AS ptime])
+- TableSourceScan(table=[[default_catalog, default_database, proctime_t]], fields=[a, b])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ LogicalProject(a=[$0], c=[$2])
<Resource name="optimized exec plan">
<![CDATA[
TableSourceScan(table=[[default_catalog, default_database, ProjectableTable, project=[a, c]]], fields=[a, c])
]]>
</Resource>
</TestCase>
<TestCase name="testSimpleProjectWithProctime">
<Resource name="sql">
<![CDATA[SELECT a, c, PROCTIME() FROM ProjectableTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], c=[$2], EXPR$2=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, ProjectableTable]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, c, PROCTIME_MATERIALIZE(PROCTIME()) AS EXPR$2])
+- TableSourceScan(table=[[default_catalog, default_database, ProjectableTable, project=[a, c]]], fields=[a, c])
]]>
</Resource>
</TestCase>
Expand Down
Loading

0 comments on commit c5bb490

Please sign in to comment.