diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala index ee71ce9dd78427..12dace4922c8fe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala @@ -72,6 +72,26 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } } + /** + * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory + * + * @param fieldNames field names + * @param fieldTypes field types, every element is Flink's [[TypeInformation]] + * @return a struct type with the input fieldNames and input fieldTypes + */ + def buildRowDataType( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]) + : RelDataType = { + val rowDataTypeBuilder = builder + fieldNames + .zip(fieldTypes) + .foreach { f => + rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true) + } + rowDataTypeBuilder.build + } + override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = { // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue // always set those to default value diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala index 15b2081544fc80..a6de2378548ff1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala @@ -19,7 +19,6 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.plan._ -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.flink.api.common.typeinfo.TypeInformation @@ -35,13 +34,10 @@ import scala.collection.JavaConverters._ abstract class BatchScan( cluster: RelOptCluster, traitSet: RelTraitSet, - table: RelOptTable, - rowRelDataType: RelDataType) + table: RelOptTable) extends TableScan(cluster, traitSet, table) with DataSetRel { - override def deriveRowType() = rowRelDataType - override def toString: String = { s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala index 10d95344d7316d..14da86296e8168 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -20,10 +20,9 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory} import org.apache.flink.api.table.plan.schema.TableSourceTable import org.apache.flink.api.table.sources.BatchTableSource @@ -32,18 +31,20 @@ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowType: RelDataType) - extends BatchScan(cluster, traitSet, table, rowType) { + val tableSource: BatchTableSource[_]) + extends BatchScan(cluster, traitSet, table) { - val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) - val tableSource = tableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]] + override def deriveRowType() = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) + } override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new BatchTableSourceScan( cluster, traitSet, getTable, - getRowType + tableSource ) } @@ -54,6 +55,6 @@ class BatchTableSourceScan( val config = tableEnv.getConfig val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - convertToExpectedType(inputDataSet, tableSourceTable, expectedType, config) + convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala index 3c34bc3839a87a..b7831368c80523 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala @@ -35,11 +35,13 @@ class DataSetScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowType: RelDataType) - extends BatchScan(cluster, traitSet, table, rowType) { + rowRelDataType: RelDataType) + extends BatchScan(cluster, traitSet, table) { val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]]) + override def deriveRowType() = rowRelDataType + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetScan( cluster, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index 1a0d2a1e670c88..8e3d8bb159696b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -57,11 +57,13 @@ class BatchTableSourceScanRule val scan: TableScan = rel.asInstanceOf[TableScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource + .asInstanceOf[BatchTableSource[_]] new BatchTableSourceScan( rel.getCluster, traitSet, scan.getTable, - rel.getRowType + tableSource ) } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala index d95b513a540d46..84d6d7ead2fbc1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala @@ -60,13 +60,7 @@ abstract class FlinkTable[T]( override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - val builder = flinkTypeFactory.builder - fieldNames - .zip(fieldTypes) - .foreach { f => - builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) - } - builder.build + flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes) } }