Skip to content

Commit

Permalink
[FLINK-22082][table-planner-blink] Nested projection push down doesn'…
Browse files Browse the repository at this point in the history
…t work for composite types, such as row(array(row))

This closes apache#15493
  • Loading branch information
fsk119 authored and godfreyhe committed Apr 13, 2021
1 parent ba5cc58 commit 2bbb1ba
Show file tree
Hide file tree
Showing 14 changed files with 438 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ private class NestedSchemaRewriter(schema: NestedSchema, builder: RexBuilder) ex
(Option.empty, child)
}
}
case expr =>
// rewrite operands of the expression
val newExpr = expr.accept(this)
// rebuild FieldAccess
(
Some(
builder.makeFieldAccess(newExpr, fieldAccess.getField.getName, true)),
null)
}
}
}
Expand All @@ -269,18 +277,25 @@ private class NestedSchemaRewriter(schema: NestedSchema, builder: RexBuilder) ex
private class NestedSchemaExtractor(schema: NestedSchema) extends RexVisitorImpl[Unit](true) {

override def visitFieldAccess(fieldAccess: RexFieldAccess): Unit = {
def internalVisit(fieldAccess: RexFieldAccess): (Int, List[String]) = {
def internalVisit(fieldAccess: RexFieldAccess): (Boolean, Int, List[String]) = {
fieldAccess.getReferenceExpr match {
case ref: RexInputRef =>
(ref.getIndex, List(ref.getName, fieldAccess.getField.getName))
(true, ref.getIndex, List(ref.getName, fieldAccess.getField.getName))
case fac: RexFieldAccess =>
val (i, n) = internalVisit(fac)
(i, n :+ fieldAccess.getField.getName)
val (success, i, n) = internalVisit(fac)
(success, i, if (success) n :+ fieldAccess.getField.getName else null)
case expr =>
// only extract operands of the expression
expr.accept(this)
(false, -1, null)
}
}

// extract the info
val (index, names) = internalVisit(fieldAccess)
val (success, index, names) = internalVisit(fieldAccess)
if (!success) {
return
}

val topLevelNodeName = schema.inputRowType.getFieldNames.get(index)
val topLevelNode = if (!schema.columns.contains(topLevelNodeName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,17 +326,23 @@ class RefFieldAccessorVisitor(usedFields: Array[Int]) extends RexVisitorImpl[Uni
}

override def visitFieldAccess(fieldAccess: RexFieldAccess): Unit = {
def internalVisit(fieldAccess: RexFieldAccess): (Int, List[String]) = {
def internalVisit(fieldAccess: RexFieldAccess): (Boolean, Int, List[String]) = {
fieldAccess.getReferenceExpr match {
case ref: RexInputRef =>
(ref.getIndex, List(fieldAccess.getField.getName))
(true, ref.getIndex, List(fieldAccess.getField.getName))
case fac: RexFieldAccess =>
val (i, n) = internalVisit(fac)
(i, n :+ fieldAccess.getField.getName)
val (success, i, n) = internalVisit(fac)
(success, i, if (success) n :+ fieldAccess.getField.getName else null)
case expr =>
expr.accept(this)
(false, -1, null)
}
}

val (index, fullName) = internalVisit(fieldAccess)
val (success, index, fullName) = internalVisit(fieldAccess)
if (!success) {
return
}
val outputIndex = order.getOrElse(index, -1)
val fields: List[List[String]] = projectedFields(outputIndex)
projectedFields(outputIndex) = fields :+ fullName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,40 @@ public void setup() {
+ " 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'"
+ ")";
util().tableEnv().executeSql(ddl5);

String ddl6 =
"CREATE TABLE NestedItemTable (\n"
+ " `ID` INT,\n"
+ " `Timestamp` TIMESTAMP(3),\n"
+ " `Result` ROW<\n"
+ " `Mid` ROW<"
+ " `data_arr` ROW<`value` BIGINT> ARRAY,\n"
+ " `data_map` MAP<STRING, ROW<`value` BIGINT>>"
+ " >"
+ " >,\n"
+ " WATERMARK FOR `Timestamp` AS `Timestamp`\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'nested-projection-supported' = 'true',"
+ " 'bounded' = 'true'\n"
+ ")";
util().tableEnv().executeSql(ddl6);

String ddl7 =
"CREATE TABLE ItemTable (\n"
+ " `ID` INT,\n"
+ " `Timestamp` TIMESTAMP(3),\n"
+ " `Result` ROW<\n"
+ " `data_arr` ROW<`value` BIGINT> ARRAY,\n"
+ " `data_map` MAP<STRING, ROW<`value` BIGINT>>>,\n"
+ " `outer_array` ARRAY<INT>,\n"
+ " `outer_map` MAP<STRING, STRING>,\n"
+ " WATERMARK FOR `Timestamp` AS `Timestamp`\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'true'\n"
+ ")";
util().tableEnv().executeSql(ddl7);
}

@Test
Expand Down Expand Up @@ -168,4 +202,45 @@ public void testNestProjectWithUpsertSource() {

util().verifyRelPlan(sqlQuery);
}

@Test
public void testNestedProjectFieldAccessWithITEM() {
util().verifyRelPlan(
"SELECT "
+ "`Result`.`Mid`.data_arr[ID].`value`, "
+ "`Result`.`Mid`.data_map['item'].`value` "
+ "FROM NestedItemTable");
}

@Test
public void testNestedProjectFieldAccessWithITEMWithConstantIndex() {
util().verifyRelPlan(
"SELECT "
+ "`Result`.`Mid`.data_arr[2].`value`, "
+ "`Result`.`Mid`.data_arr "
+ "FROM NestedItemTable");
}

@Test
public void testNestedProjectFieldAccessWithITEMContainsTopLevelAccess() {
util().verifyRelPlan(
"SELECT "
+ "`Result`.`Mid`.data_arr[2].`value`, "
+ "`Result`.`Mid`.data_arr[ID].`value`, "
+ "`Result`.`Mid`.data_map['item'].`value`, "
+ "`Result`.`Mid` "
+ "FROM NestedItemTable");
}

@Test
public void testProjectFieldAccessWithITEM() {
util().verifyRelPlan(
"SELECT "
+ "`Result`.data_arr[ID].`value`, "
+ "`Result`.data_map['item'].`value`, "
+ "`outer_array`[1], "
+ "`outer_array`[ID], "
+ "`outer_map`['item'] "
+ "FROM ItemTable");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,58 +40,63 @@ Calc(select=[id, deepNested_nested1_name AS nestedName, nested_value AS nestedVa
]]>
</Resource>
</TestCase>
<TestCase name="testNestProjectWithMetadata">
<TestCase name="testSimpleProject">
<Resource name="sql">
<![CDATA[
SELECT id,
deepNested.nested1 AS nested1,
deepNested.nested1.`value` + deepNested.nested2.num + metadata_1 as results
FROM T
]]>
<![CDATA[SELECT a, c FROM ProjectableTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
LogicalProject(a=[$0], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, ProjectableTable]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + metadata_1) AS results])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num, metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
TableSourceScan(table=[[default_catalog, default_database, ProjectableTable, project=[a, c]]], fields=[a, c])
]]>
</Resource>
</TestCase>
<TestCase name="testSimpleProject">
<TestCase name="testNestedProjectFieldWithITEM">
<Resource name="sql">
<![CDATA[SELECT a, c FROM ProjectableTable]]>
<![CDATA[
SELECT
`result`.`data_arr`[`id`].`value`,
`result`.`data_map`['item'].`value`
FROM NestedItemTable
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, ProjectableTable]])
LogicalProject(EXPR$0=[ITEM($2.data_arr, $0).value], EXPR$1=[ITEM($2.data_map, _UTF-16LE'item').value])
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
TableSourceScan(table=[[default_catalog, default_database, ProjectableTable, project=[a, c]]], fields=[a, c])
Calc(select=[ITEM(result_data_arr, id).value AS EXPR$0, ITEM(result_data_map, _UTF-16LE'item').value AS EXPR$1])
+- TableSourceScan(table=[[default_catalog, default_database, NestedItemTable, project=[result_data_arr, result_data_map, id]]], fields=[result_data_arr, result_data_map, id])
]]>
</Resource>
</TestCase>
<TestCase name="testSimpleProjectWithProctime">
<TestCase name="testNestProjectWithMetadata">
<Resource name="sql">
<![CDATA[SELECT a, c, PROCTIME() FROM ProjectableTable]]>
<![CDATA[
SELECT id,
deepNested.nested1 AS nested1,
deepNested.nested1.`value` + deepNested.nested2.num + metadata_1 as results
FROM T
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], c=[$2], EXPR$2=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, ProjectableTable]])
LogicalProject(id=[$0], nested1=[$1.nested1], results=[+(+($1.nested1.value, $1.nested2.num), $2)])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, c, PROCTIME_MATERIALIZE(PROCTIME()) AS EXPR$2])
+- TableSourceScan(table=[[default_catalog, default_database, ProjectableTable, project=[a, c]]], fields=[a, c])
Calc(select=[id, deepNested_nested1 AS nested1, ((deepNested_nested1.value + deepNested_nested2_num) + metadata_1) AS results])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested_nested1, deepNested_nested2_num, metadata_1]]], fields=[id, deepNested_nested1, deepNested_nested2_num, metadata_1])
]]>
</Resource>
</TestCase>
Expand All @@ -112,6 +117,23 @@ HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+- TableSourceScan(table=[[default_catalog, default_database, ProjectableTable, project=[]]], fields=[])
]]>
</Resource>
</TestCase>
<TestCase name="testSimpleProjectWithProctime">
<Resource name="sql">
<![CDATA[SELECT a, c, PROCTIME() FROM ProjectableTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], c=[$2], EXPR$2=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, ProjectableTable]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, c, PROCTIME_MATERIALIZE(PROCTIME()) AS EXPR$2])
+- TableSourceScan(table=[[default_catalog, default_database, ProjectableTable, project=[a, c]]], fields=[a, c])
]]>
</Resource>
</TestCase>
Expand Down
Loading

0 comments on commit 2bbb1ba

Please sign in to comment.