Skip to content

Commit

Permalink
[FLINK-22166][table] Empty values with sort willl fail
Browse files Browse the repository at this point in the history
This closes apache#15571
  • Loading branch information
JingsongLi committed Apr 13, 2021
1 parent a2c429d commit 0e82a99
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ object FlinkLogicalRelFactories {
rowType: RelDataType,
tuples: util.List[ImmutableList[RexLiteral]]): RelNode = {
FlinkLogicalValues.create(
cluster, rowType, ImmutableList.copyOf[ImmutableList[RexLiteral]](tuples))
cluster, null, rowType, ImmutableList.copyOf[ImmutableList[RexLiteral]](tuples))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private class FlinkLogicalValuesConverter

override def convert(rel: RelNode): RelNode = {
val values = rel.asInstanceOf[LogicalValues]
FlinkLogicalValues.create(rel.getCluster, values.getRowType, values.getTuples())
FlinkLogicalValues.create(
rel.getCluster, values.getTraitSet, values.getRowType, values.getTuples())
}
}

Expand All @@ -77,13 +78,31 @@ object FlinkLogicalValues {

def create(
cluster: RelOptCluster,
traitSet: RelTraitSet,
rowType: RelDataType,
tuples: ImmutableList[ImmutableList[RexLiteral]]): FlinkLogicalValues = {
val mq = cluster.getMetadataQuery
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).replaceIfs(
RelCollationTraitDef.INSTANCE, new Supplier[util.List[RelCollation]]() {
def get: util.List[RelCollation] = RelMdCollation.values(mq, rowType, tuples)
}).simplify()
new FlinkLogicalValues(cluster, traitSet, rowType, tuples)
var newTraitSet = cluster.traitSetOf(FlinkConventions.LOGICAL)
if (tuples.isEmpty && traitSet != null) {
// ReduceExpressionsRule will produce emtpy values
// And PruneEmptyRules will remove sort rel node
// So there will be a empty values with useless collation, in this case we need keep
// original collation to make this conversion success.
newTraitSet = newTraitSet.replaceIf(
RelCollationTraitDef.INSTANCE.asInstanceOf[RelTraitDef[RelTrait]],
new Supplier[RelTrait]() {
def get: RelTrait = {
traitSet.getTrait(RelCollationTraitDef.INSTANCE)
}
})
} else {
newTraitSet = newTraitSet.replaceIfs(
RelCollationTraitDef.INSTANCE, new Supplier[util.List[RelCollation]]() {
def get: util.List[RelCollation] = {
RelMdCollation.values(mq, rowType, tuples)
}
})
}
new FlinkLogicalValues(cluster, newTraitSet.simplify(), rowType, tuples)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@ Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, D
: +- Values(tuples=[[{ 0 }]], values=[ZERO]), rowType=[RecordType(INTEGER ZERO)]
+- Calc(select=[3 AS EXPR$0, 4.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
+- Values(tuples=[[{ 0 }]], values=[ZERO]), rowType=[RecordType(INTEGER ZERO)]
]]>
</Resource>
</TestCase>
<TestCase name="testEmptyValuesWithSort">
<Resource name="sql">
<![CDATA[SELECT * FROM (VALUES 1, 2, 3) AS T (a) WHERE a = 1 and a = 2 ORDER BY a]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
+- LogicalProject(a=[$0])
+- LogicalFilter(condition=[AND(=($0, 1), =($0, 2))])
+- LogicalValues(tuples=[[{ 1 }, { 2 }, { 3 }]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Values(tuples=[[]], values=[a])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ class ValuesTest extends TableTestBase {
util.verifyRelPlanWithType("SELECT * FROM (VALUES (1, 2.0), (3, CAST(4 AS BIGINT))) AS T(a, b)")
}

@Test
def testEmptyValuesWithSort(): Unit = {
util.verifyExecPlan("SELECT * FROM (VALUES 1, 2, 3) AS T (a) WHERE a = 1 and a = 2 ORDER BY a")
}

}

0 comments on commit 0e82a99

Please sign in to comment.