Skip to content

Commit

Permalink
[FLINK-3916] [table] Allow generic types passing the Table API
Browse files Browse the repository at this point in the history
This closes apache#2197.
  • Loading branch information
twalthr committed Jul 12, 2016
1 parent 971dcc5 commit 1b327f1
Show file tree
Hide file tree
Showing 21 changed files with 439 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ abstract class BatchTableEnvironment(
*/
override def sql(query: String): Table = {

val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(query)
// validate the sql query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.flink.api.table
import java.util

import com.google.common.collect.ImmutableList
import org.apache.calcite.adapter.java.JavaTypeFactory
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.RelOptTable.ViewExpander
import org.apache.calcite.plan._
Expand All @@ -30,47 +29,37 @@ import org.apache.calcite.rel.RelRoot
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.sql.parser.{SqlParser, SqlParseException}
import org.apache.calcite.sql.parser.{SqlParseException, SqlParser}
import org.apache.calcite.sql.validate.SqlValidator
import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, SqlRexConvertletTable}
import org.apache.calcite.tools.{RelConversionException, ValidationException => CValidationException, Frameworks, FrameworkConfig}
import org.apache.calcite.util.Util
import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException, ValidationException => CValidationException}

import scala.collection.JavaConversions._

/** NOTE: this is heavily insipred by Calcite's PlannerImpl.
We need it in order to share the planner between the Table API relational plans
and the SQL relation plans that are created by the Calcite parser.
The only difference is that we initialize the RelOptPlanner planner
when instantiating, instead of creating a new one in the ready() method. **/
class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) {
/**
* NOTE: this is heavily inspired by Calcite's PlannerImpl.
* We need it in order to share the planner between the Table API relational plans
* and the SQL relation plans that are created by the Calcite parser.
* The main difference is that we do not create a new RelOptPlanner in the ready() method.
*/
class FlinkPlannerImpl(
config: FrameworkConfig,
planner: RelOptPlanner,
typeFactory: FlinkTypeFactory) {

val operatorTable: SqlOperatorTable = config.getOperatorTable
/** Holds the trait definitions to be registered with planner. May be null. */
val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
val parserConfig: SqlParser.Config = config.getParserConfig
val convertletTable: SqlRexConvertletTable = config.getConvertletTable
var defaultSchema: SchemaPlus = config.getDefaultSchema
val defaultSchema: SchemaPlus = config.getDefaultSchema

var typeFactory: JavaTypeFactory = null
var validator: FlinkCalciteSqlValidator = null
var validatedSqlNode: SqlNode = null
var root: RelRoot = null

private def ready() {
Frameworks.withPlanner(new Frameworks.PlannerAction[Unit] {
def apply(
cluster: RelOptCluster,
relOptSchema: RelOptSchema,
rootSchema: SchemaPlus): Unit = {

Util.discard(rootSchema)
typeFactory = cluster.getTypeFactory.asInstanceOf[JavaTypeFactory]
if (planner == null) {
planner = cluster.getPlanner
}
}
}, config)
if (this.traitDefs != null) {
planner.clearRelTraitDefs()
for (traitDef <- this.traitDefs) {
Expand All @@ -95,9 +84,8 @@ class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) {
validatedSqlNode = validator.validate(sqlNode)
}
catch {
case e: RuntimeException => {
case e: RuntimeException =>
throw new CValidationException(e)
}
}
validatedSqlNode
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table

import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.{Context, RelOptCluster, RelOptSchema}
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.tools.Frameworks.PlannerAction
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}

/**
* Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
*/
class FlinkRelBuilder(
context: Context,
cluster: RelOptCluster,
relOptSchema: RelOptSchema)
extends RelBuilder(
context,
cluster,
relOptSchema) {

def getPlanner = cluster.getPlanner

def getCluster = cluster

override def getTypeFactory: FlinkTypeFactory =
super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
}

object FlinkRelBuilder {

def create(config: FrameworkConfig): FlinkRelBuilder = {
// prepare planner and collect context instances
val clusters: Array[RelOptCluster] = Array(null)
val relOptSchemas: Array[RelOptSchema] = Array(null)
val rootSchemas: Array[SchemaPlus] = Array(null)
Frameworks.withPlanner(new PlannerAction[Void] {
override def apply(
cluster: RelOptCluster,
relOptSchema: RelOptSchema,
rootSchema: SchemaPlus)
: Void = {
clusters(0) = cluster
relOptSchemas(0) = relOptSchema
rootSchemas(0) = rootSchema
null
}
})
val planner = clusters(0).getPlanner
val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader]

// create Flink type factory
val typeSystem = config.getTypeSystem
val typeFactory = new FlinkTypeFactory(typeSystem)

// create context instances with Flink type factory
val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
val relOptSchema = new CalciteCatalogReader(
calciteSchema,
config.getParserConfig.caseSensitive(),
defaultRelOptSchema.getSchemaName,
typeFactory)

new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table

import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
import org.apache.flink.api.table.plan.schema.GenericRelDataType
import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple

import scala.collection.mutable

/**
* Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
* and Calcite's [[RelDataType]].
*/
class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {

private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()

def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
// simple type can be converted to SQL types and vice versa
if (isSimple(typeInfo)) {
createSqlType(typeInfoToSqlTypeName(typeInfo))
}
// advanced types require specific RelDataType
// for storing the original TypeInformation
else {
seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
}
}

private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
// TODO add specific RelDataTypes
// for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType
case ti: TypeInformation[_] =>
new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])

case ti@_ =>
throw new TableException(s"Unsupported type information: $ti")
}
}

object FlinkTypeFactory {

private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
case BOOLEAN_TYPE_INFO => BOOLEAN
case BYTE_TYPE_INFO => TINYINT
case SHORT_TYPE_INFO => SMALLINT
case INT_TYPE_INFO => INTEGER
case LONG_TYPE_INFO => BIGINT
case FLOAT_TYPE_INFO => FLOAT
case DOUBLE_TYPE_INFO => DOUBLE
case STRING_TYPE_INFO => VARCHAR
case BIG_DEC_TYPE_INFO => DECIMAL

// date/time types
case SqlTimeTypeInfo.DATE => DATE
case SqlTimeTypeInfo.TIME => TIME
case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP

case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
throw new TableException("Character type is not supported.")

case _@t =>
throw new TableException(s"Type is not supported: $t")
}

def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
case BOOLEAN => BOOLEAN_TYPE_INFO
case TINYINT => BYTE_TYPE_INFO
case SMALLINT => SHORT_TYPE_INFO
case INTEGER => INT_TYPE_INFO
case BIGINT => LONG_TYPE_INFO
case FLOAT => FLOAT_TYPE_INFO
case DOUBLE => DOUBLE_TYPE_INFO
case VARCHAR | CHAR => STRING_TYPE_INFO
case DECIMAL => BIG_DEC_TYPE_INFO

// date/time types
case DATE => SqlTimeTypeInfo.DATE
case TIME => SqlTimeTypeInfo.TIME
case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
case INTERVAL_DAY_TIME | INTERVAL_YEAR_MONTH =>
throw new TableException("Intervals are not supported yet.")

case NULL =>
throw new TableException("Type NULL is not supported. " +
"Null values must have a supported type.")

// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
// are represented as integer
case SYMBOL => INT_TYPE_INFO

// extract encapsulated TypeInformation
case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
genericRelDataType.typeInfo

case _@t =>
throw new TableException(s"Type is not supported: $t")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ abstract class StreamTableEnvironment(
*/
override def sql(query: String): Table = {

val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(query)
// validate the sql query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,23 @@ package org.apache.flink.api.table
import java.util.concurrent.atomic.AtomicInteger

import org.apache.calcite.config.Lex
import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}

import org.apache.calcite.tools.{FrameworkConfig, Frameworks}
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv}
import org.apache.flink.api.java.table.{StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv}
import org.apache.flink.api.scala.table.{StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.api.table.plan.cost.DataSetCostFactory
import org.apache.flink.api.table.sinks.TableSink
import org.apache.flink.api.table.plan.schema.{RelTable, TransStreamTable}
import org.apache.flink.api.table.sinks.TableSink
import org.apache.flink.api.table.validate.FunctionCatalog
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
Expand Down Expand Up @@ -73,16 +70,12 @@ abstract class TableEnvironment(val config: TableConfig) {
.build

// the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
protected val relBuilder: RelBuilder = RelBuilder.create(frameworkConfig)

private val cluster: RelOptCluster = relBuilder
.values(Array("dummy"), new Integer(1))
.build().getCluster
protected val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)

// the planner instance used to optimize queries of this TableEnvironment
private val planner: RelOptPlanner = cluster.getPlanner
private val planner: RelOptPlanner = relBuilder.getPlanner

private val typeFactory: RelDataTypeFactory = cluster.getTypeFactory
private val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory

private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuildIns

Expand Down Expand Up @@ -200,16 +193,21 @@ abstract class TableEnvironment(val config: TableConfig) {
"TMP_" + attrNameCntr.getAndIncrement()
}

/** Returns the [[RelBuilder]] of this TableEnvironment. */
private[flink] def getRelBuilder: RelBuilder = {
/** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
private[flink] def getRelBuilder: FlinkRelBuilder = {
relBuilder
}

/** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
protected def getPlanner: RelOptPlanner = {
private[flink] def getPlanner: RelOptPlanner = {
planner
}

/** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
private[flink] def getTypeFactory: FlinkTypeFactory = {
typeFactory
}

private[flink] def getFunctionCatalog: FunctionCatalog = {
functionCatalog
}
Expand Down
Loading

0 comments on commit 1b327f1

Please sign in to comment.