From 9260529b1b25c8ed1c08fcdb348a1336959d2477 Mon Sep 17 00:00:00 2001 From: Kurt Young Date: Mon, 5 Dec 2016 09:43:13 +0800 Subject: [PATCH] [FLINK-5251] [table] Decouple StreamTableSourceScan from TableSourceTable. This closes #2934. --- .../nodes/datastream/DataStreamScan.scala | 6 ++++-- .../plan/nodes/datastream/StreamScan.scala | 5 +---- .../datastream/StreamTableSourceScan.scala | 19 ++++++++++--------- .../StreamTableSourceScanRule.scala | 6 +++++- 4 files changed, 20 insertions(+), 16 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala index 463e1bc0af3aa..da83b647dec00 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala @@ -35,11 +35,13 @@ class DataStreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowType: RelDataType) - extends StreamScan(cluster, traitSet, table, rowType) { + rowRelDataType: RelDataType) + extends StreamScan(cluster, traitSet, table) { val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]]) + override def deriveRowType() = rowRelDataType + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamScan( cluster, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala index 17620d0d452e7..b13770e6e740d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala @@ -37,13 +37,10 @@ import scala.collection.JavaConverters._ abstract class StreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, - table: RelOptTable, - rowRelDataType: RelDataType) + table: RelOptTable) extends TableScan(cluster, traitSet, table) with DataStreamRel { - override def deriveRowType() = rowRelDataType - protected def convertToExpectedType( input: DataStream[Any], flinkTable: FlinkTable[_], diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala index 21b8a6310e6e1..8201070ce5756 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -20,11 +20,10 @@ package org.apache.flink.api.table.plan.nodes.datastream import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.StreamTableEnvironment import org.apache.flink.api.table.plan.schema.TableSourceTable import org.apache.flink.api.table.sources.StreamTableSource +import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment} import org.apache.flink.streaming.api.datastream.DataStream /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ @@ -32,18 +31,20 @@ class StreamTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowType: RelDataType) - extends StreamScan(cluster, traitSet, table, rowType) { + tableSource: StreamTableSource[_]) + extends StreamScan(cluster, traitSet, table) { - val tableSourceTable = table.unwrap(classOf[TableSourceTable]) - val tableSource = tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]] + override def deriveRowType() = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) + } override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new StreamTableSourceScan( cluster, traitSet, - table, - rowType + getTable, + tableSource ) } @@ -55,7 +56,7 @@ class StreamTableSourceScan( val inputDataStream: DataStream[Any] = tableSource .getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - convertToExpectedType(inputDataStream, tableSourceTable, expectedType, config) + convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala index 9d8075c201a55..91dd255db0e76 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala @@ -58,11 +58,15 @@ class StreamTableSourceScanRule val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) + // The original registered table source + val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable]) + val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]] + new StreamTableSourceScan( rel.getCluster, traitSet, scan.getTable, - rel.getRowType + tableSource ) } }