Skip to content

Commit

Permalink
[FLINK-21733][table-planner-blink] WatermarkAssigner incorrectly reco…
Browse files Browse the repository at this point in the history
…mputing the rowtime index which may cause ArrayIndexOutOfBoundsException (apache#15150)
  • Loading branch information
lincoln-lil committed Mar 12, 2021
1 parent 615d5d1 commit c2d9e69
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,7 @@ abstract class WatermarkAssigner(
}

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
val rowtimeFieldName = inputRel.getRowType.getFieldNames.get(rowtimeFieldIndex)
val newInputRel = inputs.get(0)
// the input fields maybe reordered, re-computed the rowtime index
val newIndex = newInputRel.getRowType.getFieldNames.indexOf(rowtimeFieldName)
copy(traitSet, newInputRel, newIndex, watermarkExpr)
copy(traitSet, inputs.get(0), rowtimeFieldIndex, watermarkExpr)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,24 @@ LogicalProject(a=[$0], b=[$1])
<![CDATA[
Calc(select=[a, b], where=[(b > 10)])
+- TableSourceScan(table=[[default_catalog, default_database, VirtualTable, watermark=[-(+($2, 5000:INTERVAL SECOND), 5000:INTERVAL SECOND)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testProjectTransposeWatermarkAssigner">
<Resource name="sql">
<![CDATA[SELECT a, b, ts FROM t1]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], ts=[$5])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($5, 10000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], t=[$4], ts=[$4])
+- LogicalTableScan(table=[[default_catalog, default_database, t1]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
TableSourceScan(table=[[default_catalog, default_database, t1, project=[a, b, t], watermark=[-($2, 10000:INTERVAL SECOND)]]], fields=[a, b, t])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,27 @@ class SourceWatermarkTest extends TableTestBase {
def testWatermarkWithMetadata(): Unit = {
util.verifyExecPlan("SELECT a, b FROM MyTable")
}

@Test
def testProjectTransposeWatermarkAssigner(): Unit = {
val sourceDDL =
s"""
|CREATE TEMPORARY TABLE `t1` (
| `a` VARCHAR,
| `b` VARCHAR,
| `c` VARCHAR,
| `d` INT,
| `t` TIMESTAMP(3),
| `ts` AS `t`,
| WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND
|) WITH (
| 'connector' = 'values',
| 'enable-watermark-push-down' = 'true',
| 'bounded' = 'false',
| 'disable-lookup' = 'true'
|)
""".stripMargin
util.tableEnv.executeSql(sourceDDL)
util.verifyExecPlan("SELECT a, b, ts FROM t1")
}
}

0 comments on commit c2d9e69

Please sign in to comment.