Skip to content

Commit

Permalink
[FLINK-5251] [table] Decouple StreamTableSourceScan from TableSourceT…
Browse files Browse the repository at this point in the history
…able.

This closes apache#2934.
  • Loading branch information
KurtYoung authored and fhueske committed Dec 6, 2016
1 parent 6f9633c commit 98d1826
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class DataStreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
extends StreamScan(cluster, traitSet, table, rowType) {
rowRelDataType: RelDataType)
extends StreamScan(cluster, traitSet, table) {

val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])

override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamScan(
cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ import scala.collection.JavaConverters._
abstract class StreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowRelDataType: RelDataType)
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
with DataStreamRel {

override def deriveRowType() = rowRelDataType

protected def convertToExpectedType(
input: DataStream[Any],
flinkTable: FlinkTable[_],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,31 @@ package org.apache.flink.api.table.plan.nodes.datastream

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.StreamTableEnvironment
import org.apache.flink.api.table.plan.schema.TableSourceTable
import org.apache.flink.api.table.sources.StreamTableSource
import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
import org.apache.flink.streaming.api.datastream.DataStream

/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
class StreamTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
extends StreamScan(cluster, traitSet, table, rowType) {
tableSource: StreamTableSource[_])
extends StreamScan(cluster, traitSet, table) {

val tableSourceTable = table.unwrap(classOf[TableSourceTable])
val tableSource = tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]]
override def deriveRowType() = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
}

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new StreamTableSourceScan(
cluster,
traitSet,
table,
rowType
getTable,
tableSource
)
}

Expand All @@ -55,7 +56,7 @@ class StreamTableSourceScan(
val inputDataStream: DataStream[Any] = tableSource
.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]

convertToExpectedType(inputDataStream, tableSourceTable, expectedType, config)
convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ class StreamTableSourceScanRule
val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)

// The original registered table source
val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]

new StreamTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable,
rel.getRowType
tableSource
)
}
}
Expand Down

0 comments on commit 98d1826

Please sign in to comment.