Skip to content

Commit

Permalink
[FLINK-3739] [table] Add a null literal to Table API
Browse files Browse the repository at this point in the history
This closes apache#1880.
  • Loading branch information
twalthr committed Apr 15, 2016
1 parent 494212b commit 50d8797
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 18 deletions.
7 changes: 7 additions & 0 deletions docs/apis/batch/libs/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,10 @@ val result = tableEnv.sql("SELECT * FROM MyTable")

{% top %}

Runtime Configuration
----
The Table API provides a configuration (the so-called `TableConfig`) to modify runtime behavior. It can be accessed either through `TableEnvironment` or passed to the `toDataSet`/`toDataStream` method when using Scala implicit conversion.

### Null Handling
By default, the Table API does not support `null` values at runtime for efficiency purposes. Null handling can be enabled by setting the `nullCheck` property in the `TableConfig` to `true`.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.language.implicitConversions
* operations.
*
* These operations must be kept in sync with the parser in
* [[org.apache.flink.api.table.parser.ExpressionParser]].
* [[org.apache.flink.api.table.expressions.ExpressionParser]].
*/
trait ImplicitExpressionOperations {
def expr: Expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ class CodeGenerator(
override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName)
val value = literal.getValue3
// null value with type
if (value == null) {
return generateNullLiteral(resultType)
}
// non-null values
literal.getType.getSqlTypeName match {
case BOOLEAN =>
generateNonNullLiteral(resultType, literal.getValue3.toString)
Expand Down Expand Up @@ -574,8 +579,6 @@ class CodeGenerator(
}
case VARCHAR | CHAR =>
generateNonNullLiteral(resultType, "\"" + value.toString + "\"")
case NULL =>
generateNullLiteral(resultType)
case SYMBOL =>
val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()
generateNonNullLiteral(resultType, symbolOrdinal.toString)
Expand Down Expand Up @@ -742,6 +745,12 @@ class CodeGenerator(
}
}

override def visitOver(over: RexOver): GeneratedExpression = ???

// ----------------------------------------------------------------------------------------------
// generator helping methods
// ----------------------------------------------------------------------------------------------

def checkNumericOrString(left: GeneratedExpression, right: GeneratedExpression): Unit = {
if (isNumeric(left)) {
requireNumeric(right)
Expand All @@ -750,12 +759,6 @@ class CodeGenerator(
}
}

override def visitOver(over: RexOver): GeneratedExpression = ???

// ----------------------------------------------------------------------------------------------
// generator helping methods
// ----------------------------------------------------------------------------------------------

private def generateInputAccess(
inputType: TypeInformation[Any],
inputTerm: String,
Expand Down Expand Up @@ -906,7 +909,7 @@ class CodeGenerator(

val wrappedCode = if (nullCheck) {
s"""
|$resultTypeTerm $resultTerm = null;
|$resultTypeTerm $resultTerm = $defaultValue;
|boolean $nullTerm = true;
|""".stripMargin
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
str => Literal(str.toBoolean)
}

lazy val nullLiteral: PackratParser[Expression] =
"Null(BYTE)" ^^ { e => Null(BasicTypeInfo.BYTE_TYPE_INFO) } |
"Null(SHORT)" ^^ { e => Null(BasicTypeInfo.SHORT_TYPE_INFO) } |
"Null(INT)" ^^ { e => Null(BasicTypeInfo.INT_TYPE_INFO) } |
"Null(LONG)" ^^ { e => Null(BasicTypeInfo.LONG_TYPE_INFO) } |
"Null(FLOAT)" ^^ { e => Null(BasicTypeInfo.FLOAT_TYPE_INFO) } |
"Null(DOUBLE)" ^^ { e => Null(BasicTypeInfo.DOUBLE_TYPE_INFO) } |
"Null(BOOL)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
"Null(BOOLEAN)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
"Null(STRING)" ^^ { e => Null(BasicTypeInfo.STRING_TYPE_INFO) } |
"Null(DATE)" ^^ { e => Null(BasicTypeInfo.DATE_TYPE_INFO) }

lazy val literalExpr: PackratParser[Expression] =
numberLiteral |
stringLiteralFlink | singleQuoteStringLiteral |
Expand Down Expand Up @@ -188,8 +200,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {

lazy val suffix =
isNull | isNotNull |
sum | min | max | count | avg | cast |
specialFunctionCalls |functionCall | functionCallWithoutArgs |
sum | min | max | count | avg | cast | nullLiteral |
specialFunctionCalls | functionCall | functionCallWithoutArgs |
specialSuffixFunctionCalls | suffixFunctionCall | suffixFunctionCallWithoutArgs |
atom

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.calcite.rex.RexNode
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.scala.table.ImplicitExpressionOperations
import org.apache.flink.api.table.typeutils.TypeConverter

object Literal {
def apply(l: Any): Literal = l match {
Expand All @@ -49,3 +50,14 @@ case class Literal(value: Any, tpe: TypeInformation[_])
relBuilder.literal(value)
}
}

case class Null(tpe: TypeInformation[_]) extends LeafExpression {
def expr = this
def typeInfo = tpe

override def toString = s"null"

override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(tpe))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package org.apache.flink.api.table
* This package contains the base class of AST nodes and all the expression language AST classes.
* Expression trees should not be manually constructed by users. They are implicitly constructed
* from the implicit DSL conversions in
* [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
* [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API,
* [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
* [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]]. For the Java API,
* expression trees should be generated from a string parser that parses expressions and creates
* AST nodes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ object TypeConverter {
case VARCHAR | CHAR => STRING_TYPE_INFO
case DATE => DATE_TYPE_INFO

case NULL =>
throw new TableException("Type NULL is not supported. " +
"Null values must have a supported type.")

// symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
// are represented as integer
case SYMBOL => INT_TYPE_INFO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,30 @@ public void testComparisons() throws Exception {
compareResultAsText(results, expected);
}

@Test
public void testNullLiteral() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

DataSource<Tuple2<Integer, Integer>> input =
env.fromElements(new Tuple2<>(1, 0));

Table table =
tableEnv.fromDataSet(input, "a, b");

Table result = table.select("a, b, Null(INT), Null(STRING) === ''");

DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
List<Row> results = ds.collect();
String expected;
if (getConfig().getNullCheck()) {
expected = "1,0,null,null";
}
else {
expected = "1,0,-1,true";
}
compareResultAsText(results, expected);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.sql.test

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.plan.TranslationContext
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized

import scala.collection.JavaConverters._

@RunWith(classOf[Parameterized])
class ExpressionsITCase(
mode: TestExecutionMode,
configMode: TableConfigMode)
extends TableProgramsTestBase(mode, configMode) {

@Test
def testNullLiteral(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = getScalaTableEnvironment
TranslationContext.reset()

val sqlQuery = "SELECT a, b, CAST(NULL AS INT), CAST(NULL AS VARCHAR) = '' FROM MyTable"

val ds = env.fromElements((1, 0))
tEnv.registerDataSet("MyTable", ds, 'a, 'b)

val result = tEnv.sql(sqlQuery)

val expected = if (getConfig.getNullCheck) {
"1,0,null,null"
} else {
"1,0,-1,true"
}
val results = result.toDataSet[Row](getConfig).collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.api.scala.sql.test

import org.apache.calcite.tools.ValidationException
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.table.expressions.{Literal, Null}
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
Expand Down Expand Up @@ -91,6 +90,26 @@ class ExpressionsITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testNullLiteral(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val t = env.fromElements((1, 0)).as('a, 'b)
.select(
'a,
'b,
Null(BasicTypeInfo.INT_TYPE_INFO),
Null(BasicTypeInfo.STRING_TYPE_INFO) === "")

val expected = if (getConfig.getNullCheck) {
"1,0,null,null"
} else {
"1,0,-1,true"
}
val results = t.toDataSet[Row](getConfig).collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

// Date literals not yet supported
@Ignore
@Test
Expand Down

0 comments on commit 50d8797

Please sign in to comment.