Skip to content

Commit

Permalink
[FLINK-3640] Add support for SQL in DataSet programs
Browse files Browse the repository at this point in the history
- add EnumerableToLogicalScan rule
- in order to be able to mix TableAPI and SQL, we need our own copy of PlannerImpl
- create a dummy RelNode in the reset() method, in order to retrieve the RelOptPlanner

This closes apache#1862
  • Loading branch information
vasia committed Apr 11, 2016
1 parent b368cb2 commit ed1e52a
Show file tree
Hide file tree
Showing 15 changed files with 1,348 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.flink.api.java.table

import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
import org.apache.calcite.rel.{RelCollations, RelNode}
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.table.plan._
import org.apache.flink.api.table.{TableConfig, Table}
import org.apache.flink.api.table.{FlinkPlannerImpl, TableConfig, Table}
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.plan.schema.DataSetTable
Expand Down Expand Up @@ -56,7 +56,8 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {

// create table scan operator
relBuilder.scan(tabName)
new Table(relBuilder.build(), relBuilder)
val relNode = relBuilder.build()
new Table(relNode, relBuilder)
}

override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {
Expand All @@ -69,9 +70,7 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {

// optimize the logical Flink plan
val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
val flinkOutputProps = RelTraitSet.createEmpty()
.plus(DataSetConvention.INSTANCE)
.plus(RelCollations.of()).simplify()
val flinkOutputProps = lPlan.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()

val dataSetPlan = try {
optProgram.run(planner, decorPlan, flinkOutputProps)
Expand All @@ -97,4 +96,21 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {

}

/**
* Parse, validate, and translate a SQL query into a relNode Table
*/
def translateSQL(query: String): Table = {

val frameworkConfig = TranslationContext.getFrameworkConfig
val planner = new FlinkPlannerImpl(frameworkConfig, TranslationContext.getPlanner)
// parse the sql query
val parsed = planner.parse(query)
// validate the sql query
val validated = planner.validate(parsed)
// transform to a relational tree
val relational = planner.rel(validated)

new Table(relational.rel, TranslationContext.getRelBuilder)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.table

import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.table.JavaBatchTranslator
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.TranslationContext
import org.apache.flink.api.table.plan.schema.{DataSetTable, TableTable}
Expand Down Expand Up @@ -83,4 +84,18 @@ class AbstractTableEnvironment {
)
TranslationContext.registerTable(dataSetTable, name)
}

/**
* Execute a SQL query and retrieve the result as a [[Table]].
* All input [[Table]]s have to be registered in the
* [[org.apache.flink.api.java.table.TableEnvironment]] with unique names,
* using [[registerTable()]] or
* [[org.apache.flink.api.java.table.TableEnvironment.registerDataSet()]]
*
* @param query the SQL query
* @return the result of the SQL query as a [[Table]]
*/
def sql(query: String): Table = {
new JavaBatchTranslator(config).translateSQL(query)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table

import org.apache.calcite.adapter.java.JavaTypeFactory
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
import org.apache.calcite.sql.validate.{SqlValidatorImpl, SqlConformance}

/**
* This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
*/
class FlinkCalciteSqlValidator(
opTab: SqlOperatorTable,
catalogReader: CalciteCatalogReader,
typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {

override def getLogicalSourceRowType(
sourceRowType: RelDataType,
insert: SqlInsert): RelDataType = {
typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType)
}

override def getLogicalTargetRowType(
targetRowType: RelDataType,
insert: SqlInsert): RelDataType = {
typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table

import java.util

import com.google.common.collect.ImmutableList
import org.apache.calcite.adapter.java.JavaTypeFactory
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.RelOptTable.ViewExpander
import org.apache.calcite.plan._
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.RelRoot
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.sql.parser.{SqlParser, SqlParseException}
import org.apache.calcite.sql.validate.SqlValidator
import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, SqlRexConvertletTable}
import org.apache.calcite.tools.{RelConversionException, ValidationException, Frameworks, FrameworkConfig}
import org.apache.calcite.util.Util
import scala.collection.JavaConversions._

/** NOTE: this is heavily insipred by Calcite's PlannerImpl.
We need it in order to share the planner between the Table API relational plans
and the SQL relation plans that are created by the Calcite parser.
The only difference is that we initialize the RelOptPlanner planner
when instantiating, instead of creating a new one in the ready() method. **/
class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) {

val operatorTable: SqlOperatorTable = config.getOperatorTable
/** Holds the trait definitions to be registered with planner. May be null. */
val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
val parserConfig: SqlParser.Config = config.getParserConfig
val convertletTable: SqlRexConvertletTable = config.getConvertletTable
var defaultSchema: SchemaPlus = config.getDefaultSchema

var typeFactory: JavaTypeFactory = null
var validator: FlinkCalciteSqlValidator = null
var validatedSqlNode: SqlNode = null
var root: RelRoot = null

private def ready() {
Frameworks.withPlanner(new Frameworks.PlannerAction[Unit] {
def apply(
cluster: RelOptCluster,
relOptSchema: RelOptSchema,
rootSchema: SchemaPlus): Unit = {

Util.discard(rootSchema)
typeFactory = cluster.getTypeFactory.asInstanceOf[JavaTypeFactory]
if (planner == null) {
planner = cluster.getPlanner
}
}
}, config)
if (this.traitDefs != null) {
planner.clearRelTraitDefs()
for (traitDef <- this.traitDefs) {
planner.addRelTraitDef(traitDef)
}
}
}

@throws(classOf[SqlParseException])
def parse(sql: String): SqlNode = {
ready()
val parser: SqlParser = SqlParser.create(sql, parserConfig)
val sqlNode: SqlNode = parser.parseStmt
sqlNode
}

@throws(classOf[ValidationException])
def validate(sqlNode: SqlNode): SqlNode = {
validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
validator.setIdentifierExpansion(true)
try {
validatedSqlNode = validator.validate(sqlNode)
}
catch {
case e: RuntimeException => {
throw new ValidationException(e)
}
}
validatedSqlNode
}

@throws(classOf[RelConversionException])
def rel(sql: SqlNode): RelRoot = {
assert(validatedSqlNode != null)
val rexBuilder: RexBuilder = createRexBuilder
val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable)
sqlToRelConverter.setTrimUnusedFields(false)
sqlToRelConverter.enableTableAccessConversion(false)
root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
root
}

/** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]]
* interface for [[org.apache.calcite.tools.Planner]]. */
class ViewExpanderImpl extends ViewExpander {

override def expandView(
rowType: RelDataType,
queryString: String,
schemaPath: util.List[String]): RelRoot = {

val parser: SqlParser = SqlParser.create(queryString, parserConfig)
var sqlNode: SqlNode = null
try {
sqlNode = parser.parseQuery
}
catch {
case e: SqlParseException =>
throw new RuntimeException("parse failed", e)
}
val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath)
val validator: SqlValidator =
new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
validator.setIdentifierExpansion(true)
val validatedSqlNode: SqlNode = validator.validate(sqlNode)
val rexBuilder: RexBuilder = createRexBuilder
val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable)
sqlToRelConverter.setTrimUnusedFields(false)
sqlToRelConverter.enableTableAccessConversion(false)
root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
FlinkPlannerImpl.this.root
}
}

private def createCatalogReader: CalciteCatalogReader = {
val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
new CalciteCatalogReader(
CalciteSchema.from(rootSchema),
parserConfig.caseSensitive,
CalciteSchema.from(defaultSchema).path(null),
typeFactory)
}

private def createRexBuilder: RexBuilder = {
new RexBuilder(typeFactory)
}

}

object FlinkPlannerImpl {
private def rootSchema(schema: SchemaPlus): SchemaPlus = {
if (schema.getParentSchema == null) {
schema
}
else {
rootSchema(schema.getParentSchema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ abstract class PlanTranslator {
*/
def createTable[A](repr: Representation[A], exprs: Array[Expression]): Table = {

val inputType = repr.getType()

val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo(repr.getType(), exprs)
createTable(repr, fieldIndexes.toArray, fieldNames.toArray)
}
Expand Down
Loading

0 comments on commit ed1e52a

Please sign in to comment.