Skip to content

Commit

Permalink
[FLINK-11834] [table-planner-blink] Introduce flink logical relationa…
Browse files Browse the repository at this point in the history
…l nodes (apache#7910)

* [FLINK-11834] [table-planner-blink] Introduce flink logical relational nodes

This commit includes most flink logical relational nodes, the rest will be introduced by other commit.

* add FlinkLogicalTableFunctionScan
* add FlinkLogicalTableSourceScan
* move RankRange into Rank.scala file
  • Loading branch information
godfreyhe authored and KurtYoung committed Mar 7, 2019
1 parent 953a5ff commit fd04309
Show file tree
Hide file tree
Showing 41 changed files with 3,288 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.flink.table.calcite

import org.apache.flink.table.`type`.{ArrayType, DecimalType, InternalType, InternalTypes, MapType, RowType}
import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, RowRelDataType, RowSchema}
import org.apache.flink.table.api.{TableException, TableSchema}
import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, RowRelDataType, RowSchema, TimeIndicatorRelDataType}

import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`._
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}

import java.util

Expand Down Expand Up @@ -91,6 +91,19 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createTypeWithNullability(relType, isNullable)
}

/**
* Creates a indicator type for event-time, but with similar properties as SQL timestamp.
*/
def createRowtimeIndicatorType(): RelDataType = {
val originalType = createTypeFromInternalType(InternalTypes.TIMESTAMP, isNullable = false)
canonize(
new TimeIndicatorRelDataType(
getTypeSystem,
originalType.asInstanceOf[BasicSqlType],
isEventTime = true)
)
}

/**
* Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
*
Expand Down Expand Up @@ -129,6 +142,49 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
logicalRowTypeBuilder.build
}

/**
* Created a struct type with the input table schema using FlinkTypeFactory
* @param tableSchema the table schema
* @return a struct type with the input fieldNames, input fieldTypes, and system fields
*/
def buildLogicalRowType(tableSchema: TableSchema, isStreaming: Option[Boolean]): RelDataType = {
buildRelDataType(
tableSchema.getFieldNames.toSeq,
tableSchema.getFieldTypes map {
case InternalTypes.PROCTIME_INDICATOR if isStreaming.isDefined && !isStreaming.get =>
InternalTypes.TIMESTAMP
case InternalTypes.ROWTIME_INDICATOR if isStreaming.isDefined && !isStreaming.get =>
InternalTypes.TIMESTAMP
case tpe: InternalType => tpe
})
}

def buildRelDataType(
fieldNames: Seq[String],
fieldTypes: Seq[InternalType]): RelDataType = {
buildRelDataType(
fieldNames,
fieldTypes,
fieldTypes.map(!FlinkTypeFactory.isTimeIndicatorType(_)))
}

def buildRelDataType(
fieldNames: Seq[String],
fieldTypes: Seq[InternalType],
fieldNullables: Seq[Boolean]): RelDataType = {
val b = builder
val fields = fieldNames.zip(fieldTypes).zip(fieldNullables)
fields foreach {
case ((fieldName, fieldType), fieldNullable) =>
if (FlinkTypeFactory.isTimeIndicatorType(fieldType) && fieldNullable) {
throw new TableException(
s"$fieldName can not be nullable because it is TimeIndicatorType!")
}
b.add(fieldName, createTypeFromInternalType(fieldType, fieldNullable))
}
b.build
}

// ----------------------------------------------------------------------------------------------

override def getJavaClass(`type`: RelDataType): java.lang.reflect.Type = {
Expand Down Expand Up @@ -221,6 +277,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp

object FlinkTypeFactory {

def isTimeIndicatorType(t: InternalType): Boolean = t match {
case InternalTypes.ROWTIME_INDICATOR | InternalTypes.PROCTIME_INDICATOR => true
case _ => false
}

def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match {
case BOOLEAN => InternalTypes.BOOLEAN
case TINYINT => InternalTypes.BYTE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,100 @@

package org.apache.flink.table.plan.nodes

import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat

import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlAsOperator
import org.apache.calcite.sql.SqlKind._

import scala.collection.JavaConversions._

/**
* Base class for flink relational expression.
*/
trait FlinkRelNode extends RelNode {

private[flink] def getExpressionString(
expr: RexNode,
inFields: List[String],
localExprsTable: Option[List[RexNode]]): String = {
getExpressionString(expr, inFields, localExprsTable, ExpressionFormat.Prefix)
}

private[flink] def getExpressionString(
expr: RexNode,
inFields: List[String],
localExprsTable: Option[List[RexNode]],
expressionFormat: ExpressionFormat): String = {

expr match {
case pr: RexPatternFieldRef =>
val alpha = pr.getAlpha
val field = inFields.get(pr.getIndex)
s"$alpha.$field"

case i: RexInputRef =>
inFields.get(i.getIndex)

case l: RexLiteral =>
l.toString

case l: RexLocalRef if localExprsTable.isEmpty =>
throw new IllegalArgumentException("Encountered RexLocalRef without " +
"local expression table")

case l: RexLocalRef =>
val lExpr = localExprsTable.get(l.getIndex)
getExpressionString(lExpr, inFields, localExprsTable, expressionFormat)

case c: RexCall =>
val op = c.getOperator.toString
val ops = c.getOperands.map(
getExpressionString(_, inFields, localExprsTable, expressionFormat))
c.getOperator match {
case _ : SqlAsOperator => ops.head
case _ =>
expressionFormat match {
case ExpressionFormat.Infix if ops.size() == 1 =>
val operand = ops.head
c.getKind match {
case IS_FALSE | IS_NOT_FALSE | IS_TRUE | IS_NOT_TRUE | IS_UNKNOWN | IS_NULL |
IS_NOT_NULL => s"$operand $op"
case _ => s"$op($operand)"
}
case ExpressionFormat.Infix => s"(${ops.mkString(s" $op ")})"
case ExpressionFormat.PostFix => s"(${ops.mkString(", ")})$op"
case ExpressionFormat.Prefix => s"$op(${ops.mkString(", ")})"
}
}

case fa: RexFieldAccess =>
val referenceExpr = getExpressionString(
fa.getReferenceExpr,
inFields,
localExprsTable,
expressionFormat)
val field = fa.getField.getName
s"$referenceExpr.$field"
case cv: RexCorrelVariable =>
cv.toString
case _ =>
throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
}
}

}

/**
* Infix, Postfix and Prefix notations are three different but equivalent ways of writing
* expressions. It is easiest to demonstrate the differences by looking at examples of operators
* that take two operands.
* Infix notation: (X + Y)
* Postfix notation: (X Y) +
* Prefix notation: + (X Y)
*/
object ExpressionFormat extends Enumeration {
type ExpressionFormat = Value
val Infix, PostFix, Prefix = Value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan.nodes.calcite

import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, SingleRel}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.calcite.util.Litmus

import java.util

import scala.collection.JavaConversions._

/**
* Relational expression that apply a number of projects to every input row,
* hence we will get multiple output rows for an input row.
*
* <p/> Values of expand_id should be unique.
*
* @param cluster cluster that this relational expression belongs to
* @param traits the traits of this rel
* @param input input relational expression
* @param outputRowType output row type
* @param projects all projects, each project contains list of expressions for
* the output columns
* @param expandIdIndex expand_id('$e') field index
*/
abstract class Expand(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
outputRowType: RelDataType,
val projects: util.List[util.List[RexNode]],
val expandIdIndex: Int)
extends SingleRel(cluster, traits, input) {

isValid(Litmus.THROW, null)

override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = {
if (projects.size() <= 1) {
return litmus.fail("Expand should output more than one rows, otherwise use Project.")
}
if (projects.exists(_.size != outputRowType.getFieldCount)) {
return litmus.fail("project filed count is not equal to output field count.")
}
if (expandIdIndex < 0 || expandIdIndex >= outputRowType.getFieldCount) {
return litmus.fail(
"expand_id field index should be greater than 0 and less than output field count.")
}
val expandIdValues = new util.HashSet[Any]()
for (project <- projects) {
project.get(expandIdIndex) match {
case literal: RexLiteral => expandIdValues.add(literal.getValue)
case _ => return litmus.fail("expand_id value should not be null.")
}
}
if (expandIdValues.size() != projects.size()) {
return litmus.fail("values of expand_id should be unique.")
}
litmus.succeed()
}

override def deriveRowType(): RelDataType = outputRowType

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val rowCnt = mq.getRowCount(this.getInput) * projects.size()
planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
}

override def estimateRowCount(mq: RelMetadataQuery): Double = {
val childRowCnt = mq.getRowCount(this.getInput)
if (childRowCnt != null) {
childRowCnt * projects.size()
} else {
null.asInstanceOf[Double]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan.nodes.calcite

import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode}

import java.util

import scala.collection.JavaConversions._

/**
* Sub-class of [[Expand]] that is a relational expression
* which returns multiple rows expanded from one input row.
* This class corresponds to Calcite logical rel.
*/
final class LogicalExpand(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
outputRowType: RelDataType,
projects: util.List[util.List[RexNode]],
expandIdIndex: Int)
extends Expand(cluster, traits, input, outputRowType, projects, expandIdIndex) {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new LogicalExpand(cluster, traitSet, inputs.get(0), outputRowType, projects, expandIdIndex)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val names = outputRowType.getFieldNames
val terms = projects.map {
project =>
project.zipWithIndex.map {
case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
case (o, _) => s"$o"
}.mkString("{", ", ", "}")
}.mkString(", ")
super.explainTerms(pw).item("projects", terms)
}
}

object LogicalExpand {
def create(
input: RelNode,
outputRowType: RelDataType,
projects: util.List[util.List[RexNode]],
expandIdIndex: Int): LogicalExpand = {
val traits = input.getCluster.traitSetOf(Convention.NONE)
new LogicalExpand(input.getCluster, traits, input, outputRowType, projects, expandIdIndex)
}
}

Loading

0 comments on commit fd04309

Please sign in to comment.