Skip to content

Commit

Permalink
[FLINK-3754] [tableAPI] Add validation phase to Table API before cons…
Browse files Browse the repository at this point in the history
…truction of RelNodes.

This closes apache#1958
  • Loading branch information
yjshen authored and fhueske committed May 17, 2016
1 parent f2e6057 commit f0d543f
Show file tree
Hide file tree
Showing 51 changed files with 1,876 additions and 726 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
*/
package org.apache.flink.api.scala.table

import scala.language.implicitConversions

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.expressions._

import scala.language.implicitConversions

/**
* These are all the operations that can be used to construct an [[Expression]] AST for expression
* operations.
Expand Down Expand Up @@ -63,7 +63,7 @@ trait ImplicitExpressionOperations {

def cast(toType: TypeInformation[_]) = Cast(expr, toType)

def as(name: Symbol) = Naming(expr, name.name)
def as(name: Symbol) = Alias(expr, name.name)

def asc = Asc(expr)
def desc = Desc(expr)
Expand Down Expand Up @@ -91,37 +91,37 @@ trait ImplicitExpressionOperations {
/**
* Calculates the Euler's number raised to the given power.
*/
def exp() = Call(BuiltInFunctionNames.EXP, expr)
def exp() = Exp(expr)

/**
* Calculates the base 10 logarithm of given value.
*/
def log10() = Call(BuiltInFunctionNames.LOG10, expr)
def log10() = Log10(expr)

/**
* Calculates the natural logarithm of given value.
*/
def ln() = Call(BuiltInFunctionNames.LN, expr)
def ln() = Ln(expr)

/**
* Calculates the given number raised to the power of the other value.
*/
def power(other: Expression) = Call(BuiltInFunctionNames.POWER, expr, other)
def power(other: Expression) = Power(expr, other)

/**
* Calculates the absolute value of given one.
*/
def abs() = Call(BuiltInFunctionNames.ABS, expr)
def abs() = Abs(expr)

/**
* Calculates the largest integer less than or equal to a given number.
*/
def floor() = Call(BuiltInFunctionNames.FLOOR, expr)
def floor() = Floor(expr)

/**
* Calculates the smallest integer greater than or equal to a given number.
*/
def ceil() = Call(BuiltInFunctionNames.CEIL, expr)
def ceil() = Ceil(expr)

/**
* Creates a substring of the given string between the given indices.
Expand All @@ -130,19 +130,17 @@ trait ImplicitExpressionOperations {
* @param endIndex last character of the substring (starting at 1, inclusive)
* @return substring
*/
def substring(beginIndex: Expression, endIndex: Expression) = {
Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
}
def substring(beginIndex: Expression, endIndex: Expression) =
SubString(expr, beginIndex, endIndex)

/**
* Creates a substring of the given string beginning at the given index to the end.
*
* @param beginIndex first character of the substring (starting at 1, inclusive)
* @return substring
*/
def substring(beginIndex: Expression) = {
Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
}
def substring(beginIndex: Expression) =
new SubString(expr, beginIndex)

/**
* Removes leading and/or trailing characters from the given string.
Expand All @@ -155,25 +153,13 @@ trait ImplicitExpressionOperations {
def trim(
removeLeading: Boolean = true,
removeTrailing: Boolean = true,
character: Expression = BuiltInFunctionConstants.TRIM_DEFAULT_CHAR) = {
character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = {
if (removeLeading && removeTrailing) {
Call(
BuiltInFunctionNames.TRIM,
BuiltInFunctionConstants.TRIM_BOTH,
character,
expr)
Trim(TrimConstants.TRIM_BOTH, character, expr)
} else if (removeLeading) {
Call(
BuiltInFunctionNames.TRIM,
BuiltInFunctionConstants.TRIM_LEADING,
character,
expr)
Trim(TrimConstants.TRIM_LEADING, character, expr)
} else if (removeTrailing) {
Call(
BuiltInFunctionNames.TRIM,
BuiltInFunctionConstants.TRIM_TRAILING,
character,
expr)
Trim(TrimConstants.TRIM_TRAILING, character, expr)
} else {
expr
}
Expand All @@ -182,51 +168,39 @@ trait ImplicitExpressionOperations {
/**
* Returns the length of a String.
*/
def charLength() = {
Call(BuiltInFunctionNames.CHAR_LENGTH, expr)
}
def charLength() = CharLength(expr)

/**
* Returns all of the characters in a String in upper case using the rules of
* the default locale.
*/
def upperCase() = {
Call(BuiltInFunctionNames.UPPER_CASE, expr)
}
def upperCase() = Upper(expr)

/**
* Returns all of the characters in a String in lower case using the rules of
* the default locale.
*/
def lowerCase() = {
Call(BuiltInFunctionNames.LOWER_CASE, expr)
}
def lowerCase() = Lower(expr)

/**
* Converts the initial letter of each word in a String to uppercase.
* Assumes a String containing only [A-Za-z0-9], everything else is treated as whitespace.
*/
def initCap() = {
Call(BuiltInFunctionNames.INIT_CAP, expr)
}
def initCap() = InitCap(expr)

/**
* Returns true, if a String matches the specified LIKE pattern.
*
* e.g. "Jo_n%" matches all Strings that start with "Jo(arbitrary letter)n"
*/
def like(pattern: Expression) = {
Call(BuiltInFunctionNames.LIKE, expr, pattern)
}
def like(pattern: Expression) = Like(expr, pattern)

/**
* Returns true, if a String matches the specified SQL regex pattern.
*
* e.g. "A+" matches all Strings that consist of at least one A
*/
def similar(pattern: Expression) = {
Call(BuiltInFunctionNames.SIMILAR, expr, pattern)
}
def similar(pattern: Expression) = Similar(expr, pattern)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.table.explain.PlanJsonParser
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, DataSetConvention}
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataSetTable}
import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
Expand Down Expand Up @@ -72,7 +74,7 @@ abstract class BatchTableEnvironment(
val m = internalNamePattern.findFirstIn(name)
m match {
case Some(_) =>
throw new TableException(s"Illegal Table name. " +
throw new ValidationException(s"Illegal Table name. " +
s"Please choose a name that does not contain the pattern $internalNamePattern")
case None =>
}
Expand All @@ -87,18 +89,15 @@ abstract class BatchTableEnvironment(
* The table to scan must be registered in the [[TableEnvironment]]'s catalog.
*
* @param tableName The name of the table to scan.
* @throws TableException if no table is registered under the given name.
* @throws ValidationException if no table is registered under the given name.
* @return The scanned table.
*/
@throws[TableException]
@throws[ValidationException]
def scan(tableName: String): Table = {

if (isRegistered(tableName)) {
relBuilder.scan(tableName)
new Table(relBuilder.build(), this)
}
else {
throw new TableException(s"Table \'$tableName\' was not found in the registry.")
new Table(this, CatalogNode(tableName, getRowType(tableName)))
} else {
throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
}
}

Expand Down Expand Up @@ -133,7 +132,7 @@ abstract class BatchTableEnvironment(
// transform to a relational tree
val relational = planner.rel(validated)

new Table(relational.rel, this)
new Table(this, LogicalRelNode(relational.rel))
}

/**
Expand Down Expand Up @@ -169,7 +168,7 @@ abstract class BatchTableEnvironment(
*/
private[flink] def explain(table: Table, extended: Boolean): String = {

val ast = RelOptUtil.toString(table.relNode)
val ast = RelOptUtil.toString(table.getRelNode)
val dataSet = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
dataSet.output(new DiscardingOutputFormat[Row])
val env = dataSet.getExecutionEnvironment
Expand Down Expand Up @@ -219,15 +218,10 @@ abstract class BatchTableEnvironment(
* @tparam T The type of the [[DataSet]].
*/
protected def registerDataSetInternal[T](
name: String, dataSet: DataSet[T],
fields: Array[Expression]): Unit = {
name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {

val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields.toArray)
val dataSetTable = new DataSetTable[T](
dataSet,
fieldIndexes.toArray,
fieldNames.toArray
)
val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
registerTableInternal(name, dataSetTable)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.calcite.sql.parser.{SqlParser, SqlParseException}
import org.apache.calcite.sql.validate.SqlValidator
import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, SqlRexConvertletTable}
import org.apache.calcite.tools.{RelConversionException, ValidationException, Frameworks, FrameworkConfig}
import org.apache.calcite.tools.{RelConversionException, ValidationException => CValidationException, Frameworks, FrameworkConfig}
import org.apache.calcite.util.Util
import scala.collection.JavaConversions._

Expand Down Expand Up @@ -96,7 +96,7 @@ class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) {
}
catch {
case e: RuntimeException => {
throw new ValidationException(e)
throw new CValidationException(e)
}
}
validatedSqlNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.Programs

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, DataStreamConvention}
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
import org.apache.flink.api.table.plan.schema.
Expand Down Expand Up @@ -86,18 +88,17 @@ abstract class StreamTableEnvironment(
* The table to ingest must be registered in the [[TableEnvironment]]'s catalog.
*
* @param tableName The name of the table to ingest.
* @throws TableException if no table is registered under the given name.
* @throws ValidationException if no table is registered under the given name.
* @return The ingested table.
*/
@throws[TableException]
@throws[ValidationException]
def ingest(tableName: String): Table = {

if (isRegistered(tableName)) {
relBuilder.scan(tableName)
new Table(relBuilder.build(), this)
new Table(this, CatalogNode(tableName, getRowType(tableName)))
}
else {
throw new TableException(s"Table \'$tableName\' was not found in the registry.")
throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
}
}

Expand Down Expand Up @@ -132,7 +133,7 @@ abstract class StreamTableEnvironment(
// transform to a relational tree
val relational = planner.rel(validated)

new Table(relational.rel, this)
new Table(this, LogicalRelNode(relational.rel))
}

/**
Expand Down Expand Up @@ -240,7 +241,7 @@ abstract class StreamTableEnvironment(
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {

val relNode = table.relNode
val relNode = table.getRelNode

// decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
Expand Down
Loading

0 comments on commit f0d543f

Please sign in to comment.