Skip to content

Commit

Permalink
[FLINK-7814] [table] Add BETWEEN and NOT BETWEEN expression to Table API
Browse files Browse the repository at this point in the history
This closes apache#6027.
  • Loading branch information
Xpray authored and twalthr committed May 23, 2018
1 parent 2aca825 commit 5563681
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 1 deletion.
48 changes: 48 additions & 0 deletions docs/dev/table/tableApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,30 @@ ANY.in(TABLE)
</td>
</tr>

<tr>
<td>
{% highlight java %}
ANY.between(lowerBound, upperBound)
{% endhighlight %}
</td>
<td>
<p>Returns true if the given expression is between <i>lowerBound</i> and <i>upperBound</i> (both inclusive). False otherwise. The parameters must be numeric types or identical comparable types.
</p>
</td>
</tr>

<tr>
<td>
{% highlight java %}
ANY.notBetween(lowerBound, upperBound)
{% endhighlight %}
</td>
<td>
<p>Returns true if the given expression is not between <i>lowerBound</i> and <i>upperBound</i> (both inclusive). False otherwise. The parameters must be numeric types or identical comparable types.
</p>
</td>
</tr>

</tbody>
</table>

Expand Down Expand Up @@ -3337,6 +3361,30 @@ ANY.in(TABLE)
</td>
</tr>

<tr>
<td>
{% highlight scala %}
ANY.between(lowerBound, upperBound)
{% endhighlight %}
</td>
<td>
<p>Returns true if the given expression is between <i>lowerBound</i> and <i>upperBound</i> (both inclusive). False otherwise. The parameters must be numeric types or identical comparable types.
</p>
</td>
</tr>

<tr>
<td>
{% highlight scala %}
ANY.notBetween(lowerBound, upperBound)
{% endhighlight %}
</td>
<td>
<p>Returns true if the given expression is not between <i>lowerBound</i> and <i>upperBound</i> (both inclusive). False otherwise. The parameters must be numeric types or identical comparable types.
</p>
</td>
</tr>

</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,28 @@ trait ImplicitExpressionOperations {
*/
def sha2(hashLength: Expression) = Sha2(expr, hashLength)

/**
* Returns true if the given expression is between lowerBound and upperBound (both inclusive).
* False otherwise. The parameters must be numeric types or identical comparable types.
*
* @param lowerBound numeric or comparable expression
* @param upperBound numeric or comparable expression
* @return boolean or null
*/
def between(lowerBound: Expression, upperBound: Expression) =
Between(expr, lowerBound, upperBound)

/**
* Returns true if the given expression is not between lowerBound and upperBound (both
* inclusive). False otherwise. The parameters must be numeric types or identical
* comparable types.
*
* @param lowerBound numeric or comparable expression
* @param upperBound numeric or comparable expression
* @return boolean or null
*/
def notBetween(lowerBound: Expression, upperBound: Expression) =
NotBetween(expr, lowerBound, upperBound)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.table.typeutils.TypeCheckUtils
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isComparable, isNumeric}
import org.apache.flink.table.validate._

Expand Down Expand Up @@ -163,3 +163,78 @@ case class IsNotFalse(child: Expression) extends UnaryExpression {

override private[flink] def resultType = BOOLEAN_TYPE_INFO
}

abstract class BetweenComparison(
expr: Expression,
lowerBound: Expression,
upperBound: Expression)
extends Expression {

override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.BOOLEAN_TYPE_INFO

override private[flink] def children: Seq[Expression] = Seq(expr, lowerBound, upperBound)

override private[flink] def validateInput(): ValidationResult = {
(expr.resultType, lowerBound.resultType, upperBound.resultType) match {
case (exprType, lowerType, upperType)
if isNumeric(exprType) && isNumeric(lowerType) && isNumeric(upperType) =>
ValidationSuccess
case (exprType, lowerType, upperType)
if isComparable(exprType) && exprType == lowerType && exprType == upperType =>
ValidationSuccess
case (exprType, lowerType, upperType) =>
ValidationFailure(
s"Between is only supported for numeric types and " +
s"identical comparable types, but got $exprType, $lowerType and $upperType"
)
}
}
}

case class Between(
expr: Expression,
lowerBound: Expression,
upperBound: Expression)
extends BetweenComparison(expr, lowerBound, upperBound) {

override def toString: String = s"($expr).between($lowerBound, $upperBound)"

override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.and(
relBuilder.call(
SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
expr.toRexNode,
lowerBound.toRexNode
),
relBuilder.call(
SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
expr.toRexNode,
upperBound.toRexNode
)
)
}
}

case class NotBetween(
expr: Expression,
lowerBound: Expression,
upperBound: Expression)
extends BetweenComparison(expr, lowerBound, upperBound) {

override def toString: String = s"($expr).notBetween($lowerBound, $upperBound)"

override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.or(
relBuilder.call(
SqlStdOperatorTable.LESS_THAN,
expr.toRexNode,
lowerBound.toRexNode
),
relBuilder.call(
SqlStdOperatorTable.GREATER_THAN,
expr.toRexNode,
upperBound.toRexNode
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ object FunctionCatalog {
"isNotTrue" -> classOf[IsNotTrue],
"isNotFalse" -> classOf[IsNotFalse],
"if" -> classOf[If],
"between" -> classOf[Between],
"notBetween" -> classOf[NotBetween],

// aggregate functions
"avg" -> classOf[Avg],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,137 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
"trueX")
testTableApi(12.isNull, "12.isNull", "false")
}

@Test
def testBetween(): Unit = {
// between
testAllApis(
4.between(Null(Types.INT), 3),
"4.between(Null(INT), 3)",
"4 BETWEEN NULL AND 3",
"false"
)
testAllApis(
4.between(Null(Types.INT), 12),
"4.between(Null(INT), 12)",
"4 BETWEEN NULL AND 12",
"null"
)
testAllApis(
4.between(Null(Types.INT), 3),
"4.between(Null(INT), 3)",
"4 BETWEEN 5 AND NULL",
"false"
)
testAllApis(
4.between(Null(Types.INT), 12),
"4.between(Null(INT), 12)",
"4 BETWEEN 0 AND NULL",
"null"
)
testAllApis(
4.between(1, 3),
"4.between(1, 3)",
"4 BETWEEN 1 AND 3",
"false"
)
testAllApis(
2.between(1, 3),
"2.between(1, 3)",
"2 BETWEEN 1 AND 3",
"true"
)
testAllApis(
2.between(2, 2),
"2.between(2, 2)",
"2 BETWEEN 2 AND 2",
"true"
)
testAllApis(
2.1.between(2.0, 3.0),
"2.1.between(2.0, 3.0)",
"2.1 BETWEEN 2.0 AND 3.0",
"true"
)
testAllApis(
2.1.between(2.1, 2.1),
"2.1.between(2.1, 2.1)",
"2.1 BETWEEN 2.1 AND 2.1",
"true"
)
testAllApis(
"b".between("a", "c"),
"'b'.between('a', 'c')",
"'b' BETWEEN 'a' AND 'c'",
"true"
)
testAllApis(
"b".between("b", "c"),
"'b'.between('b', 'c')",
"'b' BETWEEN 'b' AND 'c'",
"true"
)
testAllApis(
"2018-05-05".toDate.between("2018-05-01".toDate, "2018-05-10".toDate),
"'2018-05-05'.toDate.between('2018-05-01'.toDate, '2018-05-10'.toDate)",
"DATE '2018-05-05' BETWEEN DATE '2018-05-01' AND DATE '2018-05-10'",
"true"
)

// not between
testAllApis(
2.notBetween(Null(Types.INT), 3),
"2.notBetween(Null(INT), 3)",
"2 NOT BETWEEN NULL AND 3",
"null"
)
testAllApis(
2.notBetween(0, 1),
"2.notBetween(0, 1)",
"2 NOT BETWEEN 0 AND 1",
"true"
)
testAllApis(
2.notBetween(1, 3),
"2.notBetween(1, 3)",
"2 NOT BETWEEN 1 AND 3",
"false"
)
testAllApis(
2.notBetween(2, 2),
"2.notBetween(2, 2)",
"2 NOT BETWEEN 2 AND 2",
"false"
)
testAllApis(
2.1.notBetween(2.0, 3.0),
"2.1.notBetween(2.0, 3.0)",
"2.1 NOT BETWEEN 2.0 AND 3.0",
"false"
)
testAllApis(
2.1.notBetween(2.1, 2.1),
"2.1.notBetween(2.1, 2.1)",
"2.1 NOT BETWEEN 2.1 AND 2.1",
"false"
)
testAllApis(
"b".notBetween("a", "c"),
"'b'.notBetween('a', 'c')",
"'b' NOT BETWEEN 'a' AND 'c'",
"false"
)
testAllApis(
"b".notBetween("b", "c"),
"'b'.notBetween('b', 'c')",
"'b' NOT BETWEEN 'b' AND 'c'",
"false"
)
testAllApis(
"2018-05-05".toDate.notBetween("2018-05-01".toDate, "2018-05-10".toDate),
"'2018-05-05'.toDate.notBetween('2018-05-01'.toDate, '2018-05-10'.toDate)",
"DATE '2018-05-05' NOT BETWEEN DATE '2018-05-01' AND DATE '2018-05-10'",
"false"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi("NULLIF(1,1) IS DISTINCT FROM NULLIF(1,1)", "false")
testSqlApi("NULLIF(1,1) IS NOT DISTINCT FROM NULLIF(1,1)", "true")
testSqlApi("NULLIF(1,1) IS NOT DISTINCT FROM NULLIF(1,1)", "true")
testSqlApi("12 BETWEEN NULL AND 13", "null")
testSqlApi("12 BETWEEN 11 AND 13", "true")
testSqlApi("12 BETWEEN ASYMMETRIC 13 AND 11", "false")
testSqlApi("12 BETWEEN SYMMETRIC 13 AND 11", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,22 @@ class ScalarOperatorsValidationTest extends ScalarOperatorsTestBase {
"FAIL"
)
}

@Test(expected = classOf[ValidationException])
def testBetweenWithDifferentOperandTypeScala(): Unit = {
testTableApi(
2.between(1, "a"),
"FAIL",
"FAIL"
)
}

@Test(expected = classOf[ValidationException])
def testBetweenWithDifferentOperandTypeJava(): Unit = {
testTableApi(
"FAIL",
"2.between(1, 'a')",
"FAIL"
)
}
}

0 comments on commit 5563681

Please sign in to comment.