From 6961d9bad804543a939be231dac36011d5b38c4a Mon Sep 17 00:00:00 2001 From: lincoln-lil Date: Wed, 17 Jul 2019 15:22:43 +0800 Subject: [PATCH] [FLINK-13284][table-planner-blink] Correct some builtin functions' return type inference in Blink planner This closes #9146 --- .../functions/sql/FlinkSqlOperatorTable.java | 10 ++--- .../expressions/ScalarFunctionsTest.scala | 1 + .../table/expressions/TemporalTypesTest.scala | 41 +++++++++++++++++-- .../runtime/functions/SqlDateTimeUtils.java | 6 ++- 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java index d3d2e5abcec74..31bf0f475eac6 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java @@ -570,7 +570,7 @@ public void lookupOperatorOverloads( public static final SqlFunction DATE_FORMAT = new SqlFunction( "DATE_FORMAT", SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE), + ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE), InferTypes.RETURN_TYPE, OperandTypes.or( OperandTypes.sequence("'(TIMESTAMP, FORMAT)'", @@ -792,7 +792,7 @@ public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) { public static final SqlFunction TO_TIMESTAMP = new SqlFunction( "TO_TIMESTAMP", SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.TO_NULLABLE), + ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE), null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.NUMERIC), @@ -811,7 +811,7 @@ public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) { public static final SqlFunction TO_DATE = new SqlFunction( "TO_DATE", SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.DATE), SqlTypeTransforms.TO_NULLABLE), + ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.DATE), SqlTypeTransforms.FORCE_NULLABLE), null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.NUMERIC), @@ -822,7 +822,7 @@ public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) { public static final SqlFunction TO_TIMESTAMP_TZ = new SqlFunction( "TO_TIMESTAMP_TZ", SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.TO_NULLABLE), + ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE), null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING), @@ -842,7 +842,7 @@ public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) { public static final SqlFunction CONVERT_TZ = new SqlFunction( "CONVERT_TZ", SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE), + ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE), null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING), diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index f05314bb19670..71c6c01a266ef 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -2556,6 +2556,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { @Test def testToTimestamp(): Unit = { + testSqlApi("to_timestamp('abc')", "null") testSqlApi("to_timestamp(1513135677000)", "2017-12-13 03:27:57.000") testSqlApi("to_timestamp('2017-09-15 00:00:00')", "2017-09-15 00:00:00.000") testSqlApi("to_timestamp('20170915000000', 'yyyyMMddHHmmss')", "2017-09-15 00:00:00.000") diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala index a43338d928981..4d0cf1dd8bfbe 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala @@ -642,16 +642,52 @@ class TemporalTypesTest extends ExpressionTestBase { testSqlApi("FROM_UNIXTIME(cast(NUll as bigInt))", nullable) + testSqlApi("TO_DATE(cast(NUll as varchar))", nullable) + + testSqlApi("TO_TIMESTAMP_TZ(cast(NUll as varchar), 'Asia/Shanghai')", nullable) + + testSqlApi( + "DATE_FORMAT_TZ(cast(NUll as timestamp), 'yyyy/MM/dd HH:mm:ss', 'Asia/Shanghai')", + nullable) + } + + @Test + def testInvalidInputCase(): Unit = { + val invalidStr = "invalid value" + testSqlApi(s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')", nullable) + testSqlApi(s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')", nullable) + testSqlApi(s"TO_DATE('$invalidStr')", nullable) + testSqlApi(s"TO_TIMESTAMP_TZ('$invalidStr', 'Asia/Shanghai')", nullable) + testSqlApi( + s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')", + nullable) + } + + @Test + def testTypeInferenceWithInvalidInput(): Unit = { + val invalidStr = "invalid value" + val cases = Seq( + s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')", + s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')", + s"TO_DATE('$invalidStr')", + s"TO_TIMESTAMP_TZ('$invalidStr', 'Asia/Shanghai')", + s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')") + + cases.foreach { + caseExpr => + testSqlApi( + s"CASE WHEN ($caseExpr) is null THEN '$nullable' ELSE '$notNullable' END", nullable) + } } @Test def testTimeZoneFunction(): Unit = { testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'Asia/Shanghai')", "2018-03-14 03:00:00.000") testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai')", - "2018-03-14 03:00:00.000") + "2018-03-14 03:00:00.000") testSqlApi("CONVERT_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')", - "2018-03-14 19:00:00") + "2018-03-14 19:00:00") testSqlApi("TO_TIMESTAMP_TZ(f14, 'UTC')", "null") @@ -661,7 +697,6 @@ class TemporalTypesTest extends ExpressionTestBase { testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'invalid_tz')", "2018-03-14 11:00:00.000") } - // ---------------------------------------------------------------------------------------------- override def testData: Row = { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java index a63efdeaf4de7..118dca72c7cb8 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java @@ -362,7 +362,11 @@ public static String dateFormatTz(long ts, String tzStr) { * @param tzTo the target time zone */ public static String convertTz(String dateStr, String format, String tzFrom, String tzTo) { - return dateFormatTz(toTimestampTz(dateStr, format, tzFrom), tzTo); + Long ts = toTimestampTz(dateStr, format, tzFrom); + if (null != ts) { // avoid NPE + return dateFormatTz(ts, tzTo); + } + return null; } public static String convertTz(String dateStr, String tzFrom, String tzTo) {