From 772b8ba7409848889c0c7178513f5adb0e422653 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Fri, 31 May 2019 08:05:23 +0200 Subject: [PATCH] [FLINK-12254][table] Port and deprecate old types listing This closes #8581. --- .../org/apache/flink/table/api/Types.java | 248 ++++++++++++++++++ .../typeutils/TimeIndicatorTypeInfo.java | 7 + .../table/typeutils/TimeIntervalTypeInfo.java | 7 + .../org/apache/flink/table/api/Types.scala | 199 -------------- .../descriptors/LiteralValueValidator.scala | 7 +- .../flink/table/python/PythonTableUtils.scala | 24 +- .../sources/tsextractors/ExistingField.scala | 4 +- .../flink/table/descriptors/RowtimeTest.scala | 4 +- .../table/runtime/batch/sql/JoinITCase.scala | 5 +- 9 files changed, 285 insertions(+), 220 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Types.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/Types.scala diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Types.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Types.java new file mode 100644 index 0000000000000..f318217b2f80c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Types.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.MultisetTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.typeutils.TimeIntervalTypeInfo; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.util.Map; + +/** + * This class enumerates all supported types of the Table API & SQL. + * + * @deprecated This class will be removed in future versions as it uses the old type system. It is + * recommended to use {@link DataTypes} instead which uses the new type system based on + * instances of {@link DataType}. Please make sure to use either the old or the new type + * system consistently to avoid unintended behavior. See the website documentation + * for more information. + */ +@Deprecated +public final class Types { + + // we use SQL-like naming for types and avoid Java keyword clashes + // CHECKSTYLE.OFF: MethodName + + /** + * Returns type information for a Table API string or SQL VARCHAR type. + */ + public static TypeInformation STRING() { + return org.apache.flink.api.common.typeinfo.Types.STRING; + } + + /** + * Returns type information for a Table API boolean or SQL BOOLEAN type. + */ + public static TypeInformation BOOLEAN() { + return org.apache.flink.api.common.typeinfo.Types.BOOLEAN; + } + + /** + * Returns type information for a Table API byte or SQL TINYINT type. + */ + public static TypeInformation BYTE() { + return org.apache.flink.api.common.typeinfo.Types.BYTE; + } + + /** + * Returns type information for a Table API short or SQL SMALLINT type. + */ + public static TypeInformation SHORT() { + return org.apache.flink.api.common.typeinfo.Types.SHORT; + } + + /** + * Returns type information for a Table API integer or SQL INT/INTEGER type. + */ + public static TypeInformation INT() { + return org.apache.flink.api.common.typeinfo.Types.INT; + } + + /** + * Returns type information for a Table API long or SQL BIGINT type. + */ + public static TypeInformation LONG() { + return org.apache.flink.api.common.typeinfo.Types.LONG; + } + + /** + * Returns type information for a Table API float or SQL FLOAT/REAL type. + */ + public static TypeInformation FLOAT() { + return org.apache.flink.api.common.typeinfo.Types.FLOAT; + } + + /** + * Returns type information for a Table API integer or SQL DOUBLE type. + */ + public static TypeInformation DOUBLE() { + return org.apache.flink.api.common.typeinfo.Types.DOUBLE; + } + + /** + * Returns type information for a Table API big decimal or SQL DECIMAL type. + */ + public static TypeInformation DECIMAL() { + return org.apache.flink.api.common.typeinfo.Types.BIG_DEC; + } + + /** + * Returns type information for a Table API SQL date or SQL DATE type. + */ + public static TypeInformation SQL_DATE() { + return org.apache.flink.api.common.typeinfo.Types.SQL_DATE; + } + + /** + * Returns type information for a Table API SQL time or SQL TIME type. + */ + public static TypeInformation SQL_TIME() { + return org.apache.flink.api.common.typeinfo.Types.SQL_TIME; + } + + /** + * Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type. + */ + public static TypeInformation SQL_TIMESTAMP() { + return org.apache.flink.api.common.typeinfo.Types.SQL_TIMESTAMP; + } + + /** + * Returns type information for a Table API interval of months. + */ + public static TypeInformation INTERVAL_MONTHS() { + return TimeIntervalTypeInfo.INTERVAL_MONTHS; + } + + /** + * Returns type information for a Table API interval of milliseconds. + */ + public static TypeInformation INTERVAL_MILLIS() { + return TimeIntervalTypeInfo.INTERVAL_MILLIS; + } + + /** + * Returns type information for {@link Row} with fields of the given types. + * + *

A row is a variable-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null regardless of the field's type. The type of + * row fields cannot be automatically inferred; therefore, it is required to provide type information + * whenever a row is used. + * + *

The schema of rows can have up to Integer.MAX_VALUE fields, however, all row + * instances must strictly adhere to the schema defined by the type info. + * + *

This method generates type information with fields of the given types; the fields have + * the default names (f0, f1, f2 ..). + * + * @param types The types of the row fields, e.g., Types.STRING(), Types.INT() + */ + public static TypeInformation ROW(TypeInformation... types) { + return org.apache.flink.api.common.typeinfo.Types.ROW(types); + } + + /** + * Returns type information for {@link Row} with fields of the given types and with given names. + * + *

A row is a variable-length, null-aware composite type for storing multiple values in a + * deterministic field order. Every field can be null independent of the field's type. The type of + * row fields cannot be automatically inferred; therefore, it is required to provide type information + * whenever a row is used. + * + *

The schema of rows can have up to Integer.MAX_VALUE fields, however, all row + * instances must strictly adhere to the schema defined by the type info. + * + * @param fieldNames The array of field names + * @param types The types of the row fields, e.g., Types.STRING(), Types.INT() + */ + public static TypeInformation ROW(String[] fieldNames, TypeInformation[] types) { + return org.apache.flink.api.common.typeinfo.Types.ROW_NAMED(fieldNames, types); + } + + /** + * Generates type information for an array consisting of Java primitive elements. The elements do + * not support null values. + * + * @param elementType type of the array elements; e.g. Types.INT() + */ + public static TypeInformation PRIMITIVE_ARRAY(TypeInformation elementType) { + if (elementType.equals(BOOLEAN())) { + return PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType.equals(BYTE())) { + return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType.equals(SHORT())) { + return PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType.equals(INT())) { + return PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType.equals(LONG())) { + return PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType.equals(FLOAT())) { + return PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO; + } else if (elementType.equals(DOUBLE())) { + return PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO; + } + throw new TableException( + String.format( + "%s cannot be an element of a primitive array. Only Java primitive types are supported.", + elementType)); + } + + /** + * Generates type information for an array consisting of Java object elements. Null values for + * elements are supported. + * + * @param elementType type of the array elements; e.g. Types.INT() + */ + public static TypeInformation OBJECT_ARRAY(TypeInformation elementType) { + return ObjectArrayTypeInfo.getInfoFor(elementType); + } + + /** + * Generates type information for a Java HashMap. Null values in keys are not supported. An + * entry's value can be null. + * + * @param keyType type of the keys of the map e.g. Types.STRING() + * @param valueType type of the values of the map e.g. Types.STRING() + */ + public static TypeInformation> MAP(TypeInformation keyType, TypeInformation valueType) { + return new MapTypeInfo<>(keyType, valueType); + } + + /** + * Generates type information for a Multiset. A Multiset is baked by a Java HashMap and maps an + * arbitrary key to an integer value. Null values in keys are not supported. + * + * @param elementType type of the elements of the multiset e.g. Types.STRING() + */ + public static TypeInformation> MULTISET(TypeInformation elementType) { + return new MultisetTypeInfo<>(elementType); + } + + // CHECKSTYLE.ON: MethodName + + private Types() { + // no instantiation + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java index e46bf3f96d885..efe99421df6b9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java @@ -25,14 +25,21 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator; import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer; +import org.apache.flink.table.api.DataTypes; import java.sql.Timestamp; /** * Type information for indicating event or processing time. However, it behaves like a * regular SQL timestamp but is serialized as Long. + * + * @deprecated This class will be removed in future versions as it is used for the old type system. It + * is recommended to use {@link DataTypes} instead. Please make sure to use either the old + * or the new type system consistently to avoid unintended behavior. See the website documentation + * for more information. */ @Internal +@Deprecated public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo { private final boolean isEventTime; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.java index 63529ac0a0a9b..578efcd4a861b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongComparator; import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.table.api.DataTypes; import java.lang.reflect.Constructor; import java.util.Objects; @@ -36,8 +37,14 @@ /** * Type information for SQL INTERVAL types. + * + * @deprecated This class will be removed in future versions as it is used for the old type system. It + * is recommended to use {@link DataTypes} instead. Please make sure to use either the old + * or the new type system consistently to avoid unintended behavior. See the website documentation + * for more information. */ @Internal +@Deprecated public final class TimeIntervalTypeInfo extends TypeInformation implements AtomicType { private static final long serialVersionUID = -1816179424364825258L; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/Types.scala deleted file mode 100644 index 4be137d1b65fc..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/Types.scala +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api - -import _root_.java.{lang, math, sql, util} - -import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes} -import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo} -import org.apache.flink.table.typeutils.TimeIntervalTypeInfo -import org.apache.flink.types.Row - -import _root_.scala.annotation.varargs - -/** - * This class enumerates all supported types of the Table API & SQL. - */ -object Types { - - /** - * Returns type information for a Table API string or SQL VARCHAR type. - */ - val STRING: TypeInformation[String] = JTypes.STRING - - /** - * Returns type information for a Table API boolean or SQL BOOLEAN type. - */ - val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN - - /** - * Returns type information for a Table API byte or SQL TINYINT type. - */ - val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE - - /** - * Returns type information for a Table API short or SQL SMALLINT type. - */ - val SHORT: TypeInformation[lang.Short] = JTypes.SHORT - - /** - * Returns type information for a Table API integer or SQL INT/INTEGER type. - */ - val INT: TypeInformation[lang.Integer] = JTypes.INT - - /** - * Returns type information for a Table API long or SQL BIGINT type. - */ - val LONG: TypeInformation[lang.Long] = JTypes.LONG - - /** - * Returns type information for a Table API float or SQL FLOAT/REAL type. - */ - val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT - - /** - * Returns type information for a Table API integer or SQL DOUBLE type. - */ - val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE - - /** - * Returns type information for a Table API big decimal or SQL DECIMAL type. - */ - val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC - - /** - * Returns type information for a Table API SQL date or SQL DATE type. - */ - val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE - - /** - * Returns type information for a Table API SQL time or SQL TIME type. - */ - val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME - - /** - * Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type. - */ - val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP - - /** - * Returns type information for a Table API interval of months. - */ - val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS - - /** - * Returns type information for a Table API interval milliseconds. - */ - val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS - - /** - * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types. - * - * A row is a variable-length, null-aware composite type for storing multiple values in a - * deterministic field order. Every field can be null regardless of the field's type. - * The type of row fields cannot be automatically inferred; therefore, it is required to provide - * type information whenever a row is used. - * - *

The schema of rows can have up to Integer.MAX_VALUE fields, however, all - * row instances must strictly adhere to the schema defined by the type info. - * - * This method generates type information with fields of the given types; the fields have - * the default names (f0, f1, f2 ..). - * - * @param types The types of the row fields, e.g., Types.STRING, Types.INT - */ - @varargs - def ROW(types: TypeInformation[_]*): TypeInformation[Row] = { - JTypes.ROW(types: _*) - } - - /** - * Returns type information for [[org.apache.flink.types.Row]] with fields of the given types - * and with given names. - * - * A row is a variable-length, null-aware composite type for storing multiple values in a - * deterministic field order. Every field can be null independent of the field's type. - * The type of row fields cannot be automatically inferred; therefore, it is required to provide - * type information whenever a row is used. - * - *

The schema of rows can have up to Integer.MAX_VALUE fields, however, all - * row instances must strictly adhere to the schema defined by the type info. - * - * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`. - * - * @param fieldNames array of field names - * @param types array of field types - */ - def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = { - JTypes.ROW_NAMED(fieldNames, types: _*) - } - - /** - * Generates type information for an array consisting of Java primitive elements. The elements - * do not support null values. - * - * @param elementType type of the array elements; e.g. Types.INT - */ - def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = { - elementType match { - case BOOLEAN => PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO - case BYTE => PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO - case SHORT => PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO - case INT => PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO - case LONG => PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO - case FLOAT => PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO - case DOUBLE => PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO - case _ => - throw new TableException(s"$elementType cannot be an element of a primitive array." + - s"Only Java primitive types are supported.") - } - } - - /** - * Generates type information for an array consisting of Java object elements. Null values for - * elements are supported. - * - * @param elementType type of the array elements; e.g. Types.STRING or Types.INT - */ - def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]] = { - ObjectArrayTypeInfo.getInfoFor(elementType) - } - - /** - * Generates type information for a Java HashMap. Null values in keys are not supported. An - * entry's value can be null. - * - * @param keyType type of the keys of the map e.g. Types.STRING - * @param valueType type of the values of the map e.g. Types.STRING - */ - def MAP[K, V]( - keyType: TypeInformation[K], - valueType: TypeInformation[V]): TypeInformation[util.Map[K, V]] = { - new MapTypeInfo(keyType, valueType) - } - - /** - * Generates type information for a Multiset. A Multiset is baked by a Java HashMap and maps an - * arbitrary key to an integer value. Null values in keys are not supported. - * - * @param elementType type of the elements of the multiset e.g. Types.STRING - */ - def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[util.Map[E, lang.Integer]] = { - new MultisetTypeInfo(elementType) - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala index ba488d8154e46..4e85858250441 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/LiteralValueValidator.scala @@ -20,7 +20,8 @@ package org.apache.flink.table.descriptors import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt} -import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.api.common.typeinfo.Types +import org.apache.flink.table.api.{TableException, ValidationException} /** * Validator for [[LiteralValue]]. @@ -53,7 +54,7 @@ class LiteralValueValidator(keyPrefix: String) extends HierarchyDescriptorValida val valueKey = s"$keyPrefix${LiteralValueValidator.VALUE}" val typeInfo = properties.getType(typeKey) typeInfo match { - case Types.DECIMAL => properties.validateBigDecimal(valueKey, false) + case Types.BIG_DEC => properties.validateBigDecimal(valueKey, false) case Types.BOOLEAN => properties.validateBoolean(valueKey, false) case Types.BYTE => properties.validateByte(valueKey, false) case Types.DOUBLE => properties.validateDouble(valueKey, false) @@ -98,7 +99,7 @@ object LiteralValueValidator { val valueKey = s"$keyPrefix$VALUE" val typeInfo = properties.getType(typeKey) typeInfo match { - case Types.DECIMAL => properties.getBigDecimal(valueKey) + case Types.BIG_DEC => properties.getBigDecimal(valueKey) case Types.BOOLEAN => properties.getBoolean(valueKey) case Types.BYTE => properties.getByte(valueKey) case Types.DOUBLE => properties.getDouble(valueKey) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/python/PythonTableUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/python/PythonTableUtils.scala index 70a5111c07a26..76a399df7a0c4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/python/PythonTableUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/python/PythonTableUtils.scala @@ -84,67 +84,67 @@ object PythonTableUtils { * null if the type of obj is unexpected because Python doesn't enforce the type. */ private def convertTo(dataType: TypeInformation[_]): Any => Any = dataType match { - case Types.BOOLEAN => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.BOOLEAN => (obj: Any) => nullSafeConvert(obj) { case b: Boolean => b } - case Types.BYTE => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.BYTE => (obj: Any) => nullSafeConvert(obj) { case c: Byte => c case c: Short => c.toByte case c: Int => c.toByte case c: Long => c.toByte } - case Types.SHORT => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.SHORT => (obj: Any) => nullSafeConvert(obj) { case c: Byte => c.toShort case c: Short => c case c: Int => c.toShort case c: Long => c.toShort } - case Types.INT => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.INT => (obj: Any) => nullSafeConvert(obj) { case c: Byte => c.toInt case c: Short => c.toInt case c: Int => c case c: Long => c.toInt } - case Types.LONG => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.LONG => (obj: Any) => nullSafeConvert(obj) { case c: Byte => c.toLong case c: Short => c.toLong case c: Int => c.toLong case c: Long => c } - case Types.FLOAT => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.FLOAT => (obj: Any) => nullSafeConvert(obj) { case c: Float => c case c: Double => c.toFloat } - case Types.DOUBLE => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.DOUBLE => (obj: Any) => nullSafeConvert(obj) { case c: Float => c.toDouble case c: Double => c } - case Types.DECIMAL => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.DECIMAL => (obj: Any) => nullSafeConvert(obj) { case c: java.math.BigDecimal => c } - case Types.SQL_DATE => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.SQL_DATE => (obj: Any) => nullSafeConvert(obj) { case c: Int => new Date(c * 86400000) } - case Types.SQL_TIME => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.SQL_TIME => (obj: Any) => nullSafeConvert(obj) { case c: Long => new Time(c / 1000) case c: Int => new Time(c.toLong / 1000) } - case Types.SQL_TIMESTAMP => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.SQL_TIMESTAMP => (obj: Any) => nullSafeConvert(obj) { case c: Long => new Timestamp(c / 1000) case c: Int => new Timestamp(c.toLong / 1000) } - case Types.STRING => (obj: Any) => nullSafeConvert(obj) { + case _ if dataType == Types.STRING => (obj: Any) => nullSafeConvert(obj) { case _ => obj.toString } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala index f21db029001e5..c9f447738a8e5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala @@ -20,8 +20,8 @@ package org.apache.flink.table.sources.tsextractors import java.util -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} -import org.apache.flink.table.api.{Types, ValidationException} +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation, Types} +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.descriptors.Rowtime import org.apache.flink.table.expressions._ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala index e7e37325d2f70..3424088a09410 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala @@ -20,9 +20,9 @@ package org.apache.flink.table.descriptors import java.util -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.table.api.{Types, ValidationException} +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.descriptors.RowtimeTest.{CustomAssigner, CustomExtractor} import org.apache.flink.table.expressions._ import org.apache.flink.table.sources.tsextractors.TimestampExtractor diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala index 6246609851d3a..05235783db02b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.batch.sql import java.util +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.table.api.Types @@ -521,8 +522,8 @@ class JoinITCase( ) implicit val typeInfo = Types.ROW( - fieldNames = Array("a", "b", "c"), - types = Array(Types.INT, + Array[String]("a", "b", "c"), + Array[TypeInformation[_]](Types.INT, Types.LONG, Types.MAP(Types.STRING, Types.STRING)) )