Skip to content

Commit

Permalink
[FLINK-5158] [table] Refactor BatchScan, DataSetScan, and BatchTableS…
Browse files Browse the repository at this point in the history
…ourceScan.

This closes apache#2921.
  • Loading branch information
beyond1920 authored and static-max committed Dec 13, 2016
1 parent 720c5f6 commit bdd508e
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
}

/**
* Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
*
* @param fieldNames field names
* @param fieldTypes field types, every element is Flink's [[TypeInformation]]
* @return a struct type with the input fieldNames and input fieldTypes
*/
def buildRowDataType(
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]])
: RelDataType = {
val rowDataTypeBuilder = builder
fieldNames
.zip(fieldTypes)
.foreach { f =>
rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true)
}
rowDataTypeBuilder.build
}

override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
// it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
// always set those to default value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.api.table.plan.nodes.dataset

import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -35,13 +34,10 @@ import scala.collection.JavaConverters._
abstract class BatchScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowRelDataType: RelDataType)
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
with DataSetRel {

override def deriveRowType() = rowRelDataType

override def toString: String = {
s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.flink.api.table.plan.nodes.dataset

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
import org.apache.flink.api.table.plan.schema.TableSourceTable
import org.apache.flink.api.table.sources.BatchTableSource

Expand All @@ -32,18 +31,20 @@ class BatchTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
extends BatchScan(cluster, traitSet, table, rowType) {
val tableSource: BatchTableSource[_])
extends BatchScan(cluster, traitSet, table) {

val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
val tableSource = tableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]]
override def deriveRowType() = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
}

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new BatchTableSourceScan(
cluster,
traitSet,
getTable,
getRowType
tableSource
)
}

Expand All @@ -54,6 +55,6 @@ class BatchTableSourceScan(
val config = tableEnv.getConfig
val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]

convertToExpectedType(inputDataSet, tableSourceTable, expectedType, config)
convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class DataSetScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
extends BatchScan(cluster, traitSet, table, rowType) {
rowRelDataType: RelDataType)
extends BatchScan(cluster, traitSet, table) {

val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])

override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetScan(
cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ class BatchTableSourceScanRule
val scan: TableScan = rel.asInstanceOf[TableScan]
val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)

val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource
.asInstanceOf[BatchTableSource[_]]
new BatchTableSourceScan(
rel.getCluster,
traitSet,
scan.getTable,
rel.getRowType
tableSource
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ abstract class FlinkTable[T](

override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
val builder = flinkTypeFactory.builder
fieldNames
.zip(fieldTypes)
.foreach { f =>
builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
}
builder.build
flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
}

}

0 comments on commit bdd508e

Please sign in to comment.