Skip to content

Commit

Permalink
[FLINK-16827][table-planner-blink] StreamExecTemporalSort should requ…
Browse files Browse the repository at this point in the history
…ire a distribution trait in StreamExecTemporalSortRule.

This closes apache#11643
  • Loading branch information
libenchao committed Jul 13, 2020
1 parent 3865f7b commit 66353f2
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalSort
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution

import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelFieldCollation.Direction
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
Expand All @@ -46,12 +47,18 @@ class StreamExecTemporalSortRule
override def convert(rel: RelNode): RelNode = {
val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort]
val input = sort.getInput()
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val convInput: RelNode = RelOptRule.convert(input, FlinkConventions.STREAM_PHYSICAL)
val requiredTraitSet = input.getTraitSet
.replace(FlinkRelDistribution.SINGLETON)
.replace(FlinkConventions.STREAM_PHYSICAL)
val providedTraitSet = sort.getTraitSet
.replace(FlinkRelDistribution.SINGLETON)
.replace(FlinkConventions.STREAM_PHYSICAL)

val convInput: RelNode = RelOptRule.convert(input, requiredTraitSet)

new StreamExecTemporalSort(
rel.getCluster,
traitSet,
providedTraitSet,
convInput,
sort.collation)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a])
+- TemporalSort(orderBy=[proctime ASC, c ASC])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -136,7 +137,8 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a])
+- TemporalSort(orderBy=[rowtime ASC, c ASC])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+- Exchange(distribution=[single])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,49 @@ class TemporalSortITCase(mode: StateBackendMode) extends StreamingWithStateTestB
assertEquals(expected, sink.getRetractResults)
}

@Test
def testEventTimeOrderByWithParallelInput(): Unit = {
val data = List(
(3L, 2L, "Hello world", 3),
(2L, 2L, "Hello", 2),
(6L, 3L, "Luke Skywalker", 6),
(5L, 3L, "I am fine.", 5),
(7L, 4L, "Comment#1", 7),
(9L, 4L, "Comment#3", 9),
(10L, 4L, "Comment#4", 10),
(8L, 4L, "Comment#2", 8),
(1L, 1L, "Hi", 1),
(4L, 3L, "Helloworld, how are you?", 4))

val t = failingDataSource(data)
.assignTimestampsAndWatermarks(
new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L))
.setParallelism(env.getParallelism)
.toTable(tEnv, 'rowtime.rowtime, 'key, 'str, 'int)
tEnv.registerTable("T", t)

val sqlQuery = "SELECT key, str, `int` FROM T ORDER BY rowtime"

val sink = new TestingRetractSink
val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
results.addSink(sink).setParallelism(1)
env.execute()

val expected = Seq(
"1,Hi,1",
"2,Hello,2",
"2,Hello world,3",
"3,Helloworld, how are you?,4",
"3,I am fine.,5",
"3,Luke Skywalker,6",
"4,Comment#1,7",
"4,Comment#2,8",
"4,Comment#3,9",
"4,Comment#4,10")

assertEquals(expected, sink.getRetractResults)
}

@Test
def testEventTimeAndOtherFieldOrderBy(): Unit = {
val data = List(
Expand Down

0 comments on commit 66353f2

Please sign in to comment.