Skip to content

Commit

Permalink
[FLINK-3086] [table] ExpressionParser does not support concatenation …
Browse files Browse the repository at this point in the history
…of suffix operations

This closes apache#1912.
  • Loading branch information
twalthr committed Apr 27, 2016
1 parent 445d5e3 commit 16059ea
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 125 deletions.
42 changes: 26 additions & 16 deletions docs/apis/batch/libs/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,42 +396,52 @@ val result = in.distinct();
### Expression Syntax
Some of the operators in previous sections expect one or more expressions. Expressions can be specified using an embedded Scala DSL or as Strings. Please refer to the examples above to learn how expressions can be specified.

This is the complete EBNF grammar for expressions:
This is the EBNF grammar for expressions:

{% highlight ebnf %}

expression = single expression , { "," , single expression } ;
expressionList = expression , { "," , expression } ;

single expression = alias | logic ;
expression = alias ;

alias = logic | logic , "AS" , field reference ;
alias = logic | ( logic , "AS" , fieldReference ) ;

logic = comparison , [ ( "&&" | "||" ) , comparison ] ;

comparison = term , [ ( "=" | "!=" | ">" | ">=" | "<" | "<=" ) , term ] ;
comparison = term , [ ( "=" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;

term = product , [ ( "+" | "-" ) , product ] ;

unary = [ "!" | "-" ] , suffix ;
product = unary , [ ( "*" | "/" | "%") , unary ] ;

suffix = atom | aggregation | cast | as ;
unary = [ "!" | "-" ] , composite ;

aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | ".avg" ] ;
composite = suffixed | atom ;

cast = atom , ".cast(" , data type , ")" ;
suffixed = cast | as | aggregation | nullCheck | evaluate | functionCall ;

data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" | "DATE" ;
cast = composite , ".cast(" , dataType , ")" ;

as = atom , ".as(" , field reference , ")" ;
dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" | "DATE" ;

atom = ( "(" , single expression , ")" ) | literal | field reference ;
as = composite , ".as(" , fieldReference , ")" ;

{% endhighlight %}
aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" ) , [ "()" ] ;

nullCheck = composite , ( ".isNull" | ".isNotNull" ) , [ "()" ] ;

evaluate = composite , ".eval(" , expression , "," , expression , ")" ;

functionCall = composite , "." , functionIdentifier , "(" , [ expression , { "," , expression } ] , ")"

Here, `literal` is a valid Java literal and `field reference` specifies a column in the data. The
column names follow Java identifier syntax.
atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;

nullLiteral = "Null(" , dataType , ")" ;

{% endhighlight %}

Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A `LONG` casted to `DATE` must be a milliseconds timestamp. A `STRING` casted to `DATE` must have the format "`yyyy-MM-dd HH:mm:ss.SSS`", "`yyyy-MM-dd`", "`HH:mm:ss`", or a milliseconds timestamp. By default, all timestamps refer to the UTC timezone beginning from January 1, 1970, 00:00:00 in milliseconds.
Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data, and `functionIdentifier` specifies a supported scalar function. The
column names and function names follow Java identifier syntax. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and functions.

{% top %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.flink.api.table.expressions

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
import org.apache.flink.api.table.ExpressionParserException

import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
Expand Down Expand Up @@ -47,6 +47,29 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val MIN: Keyword = Keyword("min")
lazy val MAX: Keyword = Keyword("max")
lazy val SUM: Keyword = Keyword("sum")
lazy val IS_NULL: Keyword = Keyword("isNull")
lazy val IS_NOT_NULL: Keyword = Keyword("isNotNull")
lazy val CAST: Keyword = Keyword("cast")
lazy val NULL: Keyword = Keyword("Null")
lazy val EVAL: Keyword = Keyword("eval")

def functionIdent: ExpressionParser.Parser[String] =
not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
not(SUM) ~ not(IS_NULL) ~ not(IS_NOT_NULL) ~ not(CAST) ~ not(NULL) ~
not(EVAL) ~> super.ident

// data types

lazy val dataType: PackratParser[TypeInformation[_]] =
"BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } |
"SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } |
"INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } |
"LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
"FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } |
"DOUBLE" ^^ { ti => BasicTypeInfo.DOUBLE_TYPE_INFO } |
("BOOL" | "BOOLEAN" ) ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
"STRING" ^^ { ti => BasicTypeInfo.STRING_TYPE_INFO } |
"DATE" ^^ { ti => BasicTypeInfo.DATE_TYPE_INFO }

// Literals

Expand Down Expand Up @@ -77,22 +100,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
str => Literal(str.toBoolean)
}

lazy val nullLiteral: PackratParser[Expression] =
"Null(BYTE)" ^^ { e => Null(BasicTypeInfo.BYTE_TYPE_INFO) } |
"Null(SHORT)" ^^ { e => Null(BasicTypeInfo.SHORT_TYPE_INFO) } |
"Null(INT)" ^^ { e => Null(BasicTypeInfo.INT_TYPE_INFO) } |
"Null(LONG)" ^^ { e => Null(BasicTypeInfo.LONG_TYPE_INFO) } |
"Null(FLOAT)" ^^ { e => Null(BasicTypeInfo.FLOAT_TYPE_INFO) } |
"Null(DOUBLE)" ^^ { e => Null(BasicTypeInfo.DOUBLE_TYPE_INFO) } |
"Null(BOOL)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
"Null(BOOLEAN)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
"Null(STRING)" ^^ { e => Null(BasicTypeInfo.STRING_TYPE_INFO) } |
"Null(DATE)" ^^ { e => Null(BasicTypeInfo.DATE_TYPE_INFO) }
lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ ")" ^^ {
case dt => Null(dt)
}

lazy val literalExpr: PackratParser[Expression] =
numberLiteral |
stringLiteralFlink | singleQuoteStringLiteral |
boolLiteral
boolLiteral | nullLiteral

lazy val fieldReference: PackratParser[Expression] = ident ^^ {
case sym => UnresolvedFieldReference(sym)
Expand All @@ -103,85 +118,114 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {

// suffix operators

lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) }
lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) }


lazy val sum: PackratParser[Expression] =
(atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) })
lazy val min: PackratParser[Expression] =
(atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) })
lazy val max: PackratParser[Expression] =
(atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) })
lazy val count: PackratParser[Expression] =
(atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) })
lazy val avg: PackratParser[Expression] =
(atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) })

lazy val cast: PackratParser[Expression] =
atom <~ ".cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } |
atom <~ ".cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } |
atom <~ ".cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } |
atom <~ ".cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } |
atom <~ ".cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } |
atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) } |
atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } |
atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) }

lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ {
case e ~ _ ~ target ~ _ => Naming(e, target.name)
}
lazy val suffixIsNull: PackratParser[Expression] =
composite <~ "." ~ IS_NULL ~ opt("()") ^^ { e => IsNull(e) }

lazy val eval: PackratParser[Expression] = atom ~
".eval(" ~ expression ~ "," ~ expression ~ ")" ^^ {
case condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => Eval(condition, ifTrue, ifFalse)
}
lazy val suffixIsNotNull: PackratParser[Expression] =
composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) }

// general function calls
lazy val suffixSum: PackratParser[Expression] =
composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }

lazy val functionCall = ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ {
case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args: _*)
}
lazy val suffixMin: PackratParser[Expression] =
composite <~ "." ~ MIN ~ opt("()") ^^ { e => Min(e) }

lazy val functionCallWithoutArgs = ident ~ "()" ^^ {
case name ~ _ => Call(name.toUpperCase)
}
lazy val suffixMax: PackratParser[Expression] =
composite <~ "." ~ MAX ~ opt("()") ^^ { e => Max(e) }

lazy val suffixFunctionCall = atom ~ "." ~ ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ {
case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args : _*)
lazy val suffixCount: PackratParser[Expression] =
composite <~ "." ~ COUNT ~ opt("()") ^^ { e => Count(e) }

lazy val suffixAvg: PackratParser[Expression] =
composite <~ "." ~ AVG ~ opt("()") ^^ { e => Avg(e) }

lazy val suffixCast: PackratParser[Expression] =
composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
case e ~ _ ~ _ ~ _ ~ dt ~ _ => Cast(e, dt)
}

lazy val suffixFunctionCallWithoutArgs = atom ~ "." ~ ident ~ "()" ^^ {
case operand ~ _ ~ name ~ _ => Call(name.toUpperCase, operand)
lazy val suffixAs: PackratParser[Expression] =
composite ~ "." ~ AS ~ "(" ~ fieldReference ~ ")" ^^ {
case e ~ _ ~ _ ~ _ ~ target ~ _ => Naming(e, target.name)
}

// special calls
lazy val suffixEval: PackratParser[Expression] =
composite ~ "." ~ EVAL ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => Eval(condition, ifTrue, ifFalse)
}

lazy val specialFunctionCalls = trim | trimWithoutArgs
lazy val suffixFunctionCall =
composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args : _*)
}

lazy val specialSuffixFunctionCalls = suffixTrim | suffixTrimWithoutArgs
lazy val suffixTrim = composite ~ ".trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~
expression ~ ")" ^^ {
case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ =>
val flag = trimType match {
case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
}
Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
}

lazy val trimWithoutArgs = "trim(" ~ expression ~ ")" ^^ {
case _ ~ operand ~ _ =>
lazy val suffixTrimWithoutArgs = composite <~ ".trim" ~ opt("()") ^^ {
case e =>
Call(
BuiltInFunctionNames.TRIM,
BuiltInFunctionConstants.TRIM_BOTH,
BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
operand)
e)
}

lazy val suffixTrimWithoutArgs = atom ~ ".trim()" ^^ {
case operand ~ _ =>
Call(
BuiltInFunctionNames.TRIM,
BuiltInFunctionConstants.TRIM_BOTH,
BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
operand)
lazy val suffixed: PackratParser[Expression] =
suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg |
suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixEval | suffixFunctionCall

// prefix operators

lazy val prefixIsNull: PackratParser[Expression] =
IS_NULL ~ "(" ~> expression <~ ")" ^^ { e => IsNull(e) }

lazy val prefixIsNotNull: PackratParser[Expression] =
IS_NOT_NULL ~ "(" ~> expression <~ ")" ^^ { e => IsNotNull(e) }

lazy val prefixSum: PackratParser[Expression] =
SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }

lazy val prefixMin: PackratParser[Expression] =
MIN ~ "(" ~> expression <~ ")" ^^ { e => Min(e) }

lazy val prefixMax: PackratParser[Expression] =
MAX ~ "(" ~> expression <~ ")" ^^ { e => Max(e) }

lazy val prefixCount: PackratParser[Expression] =
COUNT ~ "(" ~> expression <~ ")" ^^ { e => Count(e) }

lazy val prefixAvg: PackratParser[Expression] =
AVG ~ "(" ~> expression <~ ")" ^^ { e => Avg(e) }

lazy val prefixCast: PackratParser[Expression] =
CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
case _ ~ _ ~ e ~ _ ~ dt ~ _ => Cast(e, dt)
}

lazy val trim = "trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ expression ~
lazy val prefixAs: PackratParser[Expression] =
AS ~ "(" ~ expression ~ "," ~ fieldReference ~ ")" ^^ {
case _ ~ _ ~ e ~ _ ~ target ~ _ => Naming(e, target.name)
}

lazy val prefixEval: PackratParser[Expression] = composite ~
EVAL ~ "(" ~ expression ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
case _ ~ _ ~ condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => Eval(condition, ifTrue, ifFalse)
}

lazy val prefixFunctionCall = functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args: _*)
}

lazy val prefixTrim = "trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ expression ~
"," ~ expression ~ ")" ^^ {
case _ ~ trimType ~ _ ~ trimCharacter ~ _ ~ operand ~ _ =>
val flag = trimType match {
Expand All @@ -192,31 +236,30 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
}

lazy val suffixTrim = atom ~ ".trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~
expression ~ ")" ^^ {
case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ =>
val flag = trimType match {
case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
}
Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
lazy val prefixTrimWithoutArgs = "trim(" ~ expression ~ ")" ^^ {
case _ ~ operand ~ _ =>
Call(
BuiltInFunctionNames.TRIM,
BuiltInFunctionConstants.TRIM_BOTH,
BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
operand)
}

lazy val suffix =
isNull | isNotNull |
sum | min | max | count | avg | cast | nullLiteral | eval |
specialFunctionCalls | functionCall | functionCallWithoutArgs |
specialSuffixFunctionCalls | suffixFunctionCall | suffixFunctionCallWithoutArgs |
atom
lazy val prefixed: PackratParser[Expression] =
prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixEval | prefixFunctionCall

// suffix/prefix composite

lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom

// unary ops

lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => Not(e) }
lazy val unaryNot: PackratParser[Expression] = "!" ~> composite ^^ { e => Not(e) }

lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => UnaryMinus(e) }
lazy val unaryMinus: PackratParser[Expression] = "-" ~> composite ^^ { e => UnaryMinus(e) }

lazy val unary = unaryNot | unaryMinus | suffix
lazy val unary = unaryNot | unaryMinus | composite

// arithmetic

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,6 @@
*/
package org.apache.flink.api.java.table.test;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.table.ExpressionParserException;
Expand Down Expand Up @@ -180,7 +162,7 @@ public void testNonWorkingDataTypes() throws Exception {
compareResultAsText(results, expected);
}

@Test(expected = ExpressionParserException.class)
@Test(expected = UnsupportedOperationException.class)
public void testNoNestedAggregation() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Expand Down
Loading

0 comments on commit 16059ea

Please sign in to comment.