Skip to content

Commit

Permalink
[FLINK-16935][table-planner-blink] Enable or delete most of the ignor…
Browse files Browse the repository at this point in the history
…ed test cases in blink planner.

This closes apache#11874
  • Loading branch information
KurtYoung committed Apr 25, 2020
1 parent f6f2fb5 commit 490e2af
Show file tree
Hide file tree
Showing 24 changed files with 177 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ object SortUtil {
if (fetch != null) {
getLimitStart(offset) + RexLiteral.intValue(fetch)
} else {
// TODO return Long.MaxValue when providing FlinkRelMdRowCount on Sort ?
Integer.MAX_VALUE
Long.MaxValue
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
<TestCase name="testInWithFilter">
<Resource name="planAfter">
<![CDATA[
HashJoin(joinType=[LeftSemiJoin], where=[=(c, a1)], select=[a, b, c], build=[right], tryDistinctBuildRow=[true])
:- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
: +- TableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
+- Exchange(distribution=[hash[a1]])
+- LocalHashAggregate(groupBy=[a1], select=[a1])
+- Calc(select=[a AS a1], where=[=(b, _UTF-16LE'two')])
+- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>

<TestCase name="testInWithProject">
<Resource name="planBefore">
<![CDATA[
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to you under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http:https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
<TestCase name="testSimpleJoin">
<Resource name="planAfter">
<![CDATA[
Calc(select=[*(o_amount, rate) AS rate])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, rowtime])
:- Exchange(distribution=[hash[o_currency]])
: +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[o_amount, o_currency, o_rowtime])
+- Exchange(distribution=[hash[currency]])
+- DataStreamScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime])
]]>
</Resource>
</TestCase>

<TestCase name="testSimpleJoin2">
<Resource name="planAfter">
<![CDATA[
Calc(select=[*(o_amount, rate) AS rate])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, rowtime])
:- Exchange(distribution=[hash[o_currency]])
: +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[o_amount, o_currency, o_rowtime])
+- Exchange(distribution=[hash[currency]])
+- DataStreamScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime])
]]>
</Resource>
</TestCase>

<TestCase name="testSimpleProctimeJoin">
<Resource name="planAfter">
<![CDATA[
Calc(select=[*(o_amount, rate) AS rate])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_proctime, currency), =(currency, o_currency))], select=[o_amount, o_currency, o_proctime, currency, rate, proctime])
:- Exchange(distribution=[hash[o_currency]])
: +- DataStreamScan(table=[[default_catalog, default_database, ProctimeOrders]], fields=[o_amount, o_currency, o_proctime])
+- Exchange(distribution=[hash[currency]])
+- DataStreamScan(table=[[default_catalog, default_database, ProctimeRatesHistory]], fields=[currency, rate, proctime])
]]>
</Resource>
</TestCase>

<TestCase name="testTemporalTableFunctionOnTopOfQuery">
<Resource name="planAfter">
<![CDATA[
Calc(select=[*(o_amount, rate) AS rate])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, rowtime])
:- Exchange(distribution=[hash[o_currency]])
: +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[o_amount, o_currency, o_rowtime])
+- Exchange(distribution=[hash[currency]])
+- Calc(select=[currency, *(rate, 2) AS rate, rowtime], where=[>(rate, 100)])
+- DataStreamScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime])
]]>
</Resource>
</TestCase>

<TestCase name="testComplexJoin">
<Resource name="planAfter">
<![CDATA[
Join(joinType=[InnerJoin], where=[=(t3_secondary_key, secondary_key)], select=[rate, secondary_key, t3_comment, t3_secondary_key], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[secondary_key]])
: +- Calc(select=[*(o_amount, rate) AS rate, secondary_key])
: +- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), OR(=(currency, o_currency), =(secondary_key, o_secondary_key)))], select=[o_rowtime, o_amount, o_currency, o_secondary_key, rowtime, currency, rate, secondary_key])
: :- Exchange(distribution=[single])
: : +- Calc(select=[rowtime AS o_rowtime, o_amount, o_currency, o_secondary_key])
: : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[rowtime, o_comment, o_amount, o_currency, o_secondary_key])
: +- Exchange(distribution=[single])
: +- Calc(select=[rowtime, currency, rate, secondary_key], where=[>(rate, 110:BIGINT)])
: +- DataStreamScan(table=[[default_catalog, default_database, RatesHistory]], fields=[rowtime, comment, currency, rate, secondary_key])
+- Exchange(distribution=[hash[t3_secondary_key]])
+- DataStreamScan(table=[[default_catalog, default_database, ThirdTable]], fields=[t3_comment, t3_secondary_key])
]]>
</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
import org.apache.flink.table.types.logical.DecimalType
import org.apache.flink.types.Row

import org.junit.{Ignore, Test}

class DecimalTypeTest extends ExpressionTestBase {
Expand Down Expand Up @@ -132,7 +133,7 @@ class DecimalTypeTest extends ExpressionTestBase {
@Ignore
@Test
def testDefaultDecimalCasting(): Unit = {
// from String
// // from String
testTableApi(
"123456789123456789123456789".cast(DataTypes.DECIMAL(38, 0)),
"'123456789123456789123456789'.cast(DECIMAL)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,71 +23,65 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row

import org.junit.{Ignore, Test}
import org.junit.Test

/**
* Tests that can only be checked manually as they are non-deterministic.
* Tests that check all non-deterministic functions can be executed.
*/
class NonDeterministicTests extends ExpressionTestBase {

@Ignore
@Test
def testCurrentDate(): Unit = {
testAllApis(
currentDate(),
"currentDate()",
"CURRENT_DATE",
"PLEASE CHECK MANUALLY")
currentDate().isGreater("1970-01-01".toDate),
"currentDate() > '1970-01-01'.toDate",
"CURRENT_DATE > DATE '1970-01-01'",
"true")
}

@Ignore
@Test
def testCurrentTime(): Unit = {
testAllApis(
currentTime(),
"currentTime()",
"CURRENT_TIME",
"PLEASE CHECK MANUALLY")
currentTime().isGreaterOrEqual("00:00:00".toTime),
"currentTime() >= '00:00:00'.toTime",
"CURRENT_TIME >= TIME '00:00:00'",
"true")
}

@Ignore
@Test
def testCurrentTimestamp(): Unit = {
testAllApis(
currentTimestamp(),
"currentTimestamp()",
"CURRENT_TIMESTAMP",
"PLEASE CHECK MANUALLY")
currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
"currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
"CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
"true")
}

@Ignore
@Test
def testLocalTimestamp(): Unit = {
testAllApis(
localTimestamp(),
"localTimestamp()",
"LOCALTIMESTAMP",
"PLEASE CHECK MANUALLY")
localTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
"localTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
"LOCALTIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
"true")
}

@Ignore
@Test
def testLocalTime(): Unit = {
testAllApis(
localTime(),
"localTime()",
"LOCALTIME",
"PLEASE CHECK MANUALLY")
localTime().isGreaterOrEqual("00:00:00".toTime),
"localTime() >= '00:00:00'.toTime",
"LOCALTIME >= TIME '00:00:00'",
"true")
}

@Ignore
@Test
def testUUID(): Unit = {
testAllApis(
uuid(),
"uuid()",
"UUID()",
"PLEASE CHECK MANUALLY")
uuid().charLength(),
"uuid().charLength",
"CHARACTER_LENGTH(UUID())",
"36")
}

// ----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,29 @@ import org.apache.flink.table.api.{SqlParserException, ValidationException}
import org.apache.flink.table.expressions.TimePointUnit
import org.apache.flink.table.planner.codegen.CodeGenException
import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase

import org.apache.calcite.avatica.util.TimeUnit
import org.junit.{Ignore, Test}
import org.junit.Test

class ScalarFunctionsValidationTest extends ScalarTypesTestBase {

// ----------------------------------------------------------------------------------------------
// Math functions
// ----------------------------------------------------------------------------------------------

@Ignore
@Test
def testInvalidLog1(): Unit = {
thrown.expect(classOf[ValidationException])
// invalid arithmetic argument
testSqlApi(
"LOG(1, 100)",
"FAIL"
"Infinity"
)
}

@Ignore
@Test
def testInvalidLog2(): Unit ={
thrown.expect(classOf[ValidationException])
// invalid arithmetic argument
testSqlApi(
"LOG(-1)",
"FAIL"
"NaN"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.batch.table

import org.apache.flink.api.scala._
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.plan.batch.table.JoinTest.Merger
Expand Down Expand Up @@ -127,8 +128,6 @@ class JoinTest extends TableTestBase {
util.verifyPlan(joined)
}

// TODO [FLINK-7942] [table] Reduce aliasing in RexNodes
// @Ignore
@Test
def testFilterJoinRule(): Unit = {
val util = batchTestUtil()
Expand All @@ -143,9 +142,7 @@ class JoinTest extends TableTestBase {
util.verifyPlan(results)
}

// TODO
@Ignore("Non-equi-join could be supported later.")
@Test
@Test(expected = classOf[ValidationException])
def testFullJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c)
Expand All @@ -154,9 +151,7 @@ class JoinTest extends TableTestBase {
util.verifyPlan(ds2.fullOuterJoin(ds1, 'b < 'd).select('c, 'g))
}

// TODO
@Ignore("Non-equi-join could be supported later.")
@Test
@Test(expected = classOf[ValidationException])
def testLeftJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c)
Expand All @@ -165,9 +160,7 @@ class JoinTest extends TableTestBase {
util.verifyPlan(ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g))
}

// TODO
@Ignore("Non-equi-join could be supported later.")
@Test
@Test(expected = classOf[ValidationException])
def testRightJoinNoEquiJoinPredicate(): Unit = {
val util = batchTestUtil()
val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.planner.utils.TableTestBase

import org.junit.{Ignore, Test}
import org.junit.Test

import java.sql.Timestamp

class SetOperatorsTest extends TableTestBase {

@Ignore("Support in subQuery in ExpressionConverter")
@Test
def testInWithFilter(): Unit = {
val util = batchTestUtil()
Expand Down
Loading

0 comments on commit 490e2af

Please sign in to comment.