Skip to content

Commit

Permalink
[FLINK-7452] [types] Add helper methods for all built-in Flink types …
Browse files Browse the repository at this point in the history
…to Types

This closes apache#4612.
  • Loading branch information
twalthr committed Dec 19, 2017
1 parent 7f99a0d commit e30066d
Show file tree
Hide file tree
Showing 8 changed files with 907 additions and 64 deletions.
6 changes: 3 additions & 3 deletions docs/dev/table/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -821,9 +821,9 @@ The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internal
| `Types.FLOAT` | `REAL, FLOAT` | `java.lang.Float` |
| `Types.DOUBLE` | `DOUBLE` | `java.lang.Double` |
| `Types.DECIMAL` | `DECIMAL` | `java.math.BigDecimal` |
| `Types.DATE` | `DATE` | `java.sql.Date` |
| `Types.TIME` | `TIME` | `java.sql.Time` |
| `Types.TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` |
| `Types.SQL_DATE` | `DATE` | `java.sql.Date` |
| `Types.SQL_TIME` | `TIME` | `java.sql.Time` |
| `Types.SQL_TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` |
| `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` |
| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` |
| `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` |
Expand Down
6 changes: 3 additions & 3 deletions docs/dev/table/tableApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -1559,9 +1559,9 @@ The Table API is built on top of Flink's DataSet and DataStream APIs. Internally
| `Types.FLOAT` | `REAL, FLOAT` | `java.lang.Float` |
| `Types.DOUBLE` | `DOUBLE` | `java.lang.Double` |
| `Types.DECIMAL` | `DECIMAL` | `java.math.BigDecimal` |
| `Types.DATE` | `DATE` | `java.sql.Date` |
| `Types.TIME` | `TIME` | `java.sql.Time` |
| `Types.TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` |
| `Types.SQL_DATE` | `DATE` | `java.sql.Date` |
| `Types.SQL_TIME` | `TIME` | `java.sql.Time` |
| `Types.SQL_TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` |
| `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` |
| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` |
| `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testGetReturnType() throws Exception {
assertTrue(returnType instanceof RowTypeInfo);
RowTypeInfo rowType = (RowTypeInfo) returnType;

RowTypeInfo expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes());
TypeInformation<Row> expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes());
assertEquals(expected, rowType);
}

Expand Down
419 changes: 397 additions & 22 deletions flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/


package org.apache.flink.api.java.typeutils;

import java.lang.reflect.Field;
Expand All @@ -31,6 +31,10 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Value;

/**
* @deprecated Use {@link org.apache.flink.api.common.typeinfo.Types} instead.
*/
@Deprecated
@Public
public class TypeInfoParser {
private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.flink.table.api

import _root_.java.{lang, math, sql, util}

import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
Expand All @@ -25,55 +27,125 @@ import org.apache.flink.types.Row
import _root_.scala.annotation.varargs

/**
* This class enumerates all supported types of the Table API.
* This class enumerates all supported types of the Table API & SQL.
*/
object Types {

val STRING = JTypes.STRING
val BOOLEAN = JTypes.BOOLEAN
/**
* Returns type information for a Table API string or SQL VARCHAR type.
*/
val STRING: TypeInformation[String] = JTypes.STRING

/**
* Returns type information for a Table API boolean or SQL BOOLEAN type.
*/
val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN

/**
* Returns type information for a Table API byte or SQL TINYINT type.
*/
val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE

/**
* Returns type information for a Table API short or SQL SMALLINT type.
*/
val SHORT: TypeInformation[lang.Short] = JTypes.SHORT

/**
* Returns type information for a Table API integer or SQL INT/INTEGER type.
*/
val INT: TypeInformation[lang.Integer] = JTypes.INT

val BYTE = JTypes.BYTE
val SHORT = JTypes.SHORT
val INT = JTypes.INT
val LONG = JTypes.LONG
val FLOAT = JTypes.FLOAT
val DOUBLE = JTypes.DOUBLE
val DECIMAL = JTypes.DECIMAL
/**
* Returns type information for a Table API long or SQL BIGINT type.
*/
val LONG: TypeInformation[lang.Long] = JTypes.LONG

val SQL_DATE = JTypes.SQL_DATE
val SQL_TIME = JTypes.SQL_TIME
val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
/**
* Returns type information for a Table API float or SQL FLOAT/REAL type.
*/
val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT

/**
* Returns type information for a Table API integer or SQL DOUBLE type.
*/
val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE

/**
* Generates row type information.
* Returns type information for a Table API big decimal or SQL DECIMAL type.
*/
val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC

/**
* Returns type information for a Table API SQL date or SQL DATE type.
*/
val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE

/**
* Returns type information for a Table API SQL time or SQL TIME type.
*/
val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME

/**
* Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type.
*/
val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP

/**
* Returns type information for a Table API interval of months.
*/
val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS

/**
* Returns type information for a Table API interval milliseconds.
*/
val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS

/**
* Returns type information for [[org.apache.flink.types.Row]] with fields of the given types.
*
* A row is a variable-length, null-aware composite type for storing multiple values in a
* deterministic field order. Every field can be null regardless of the field's type.
* The type of row fields cannot be automatically inferred; therefore, it is required to provide
* type information whenever a row is used.
*
* A row type consists of zero or more fields with a field name and a corresponding type.
* <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all
* row instances must strictly adhere to the schema defined by the type info.
*
* The fields have the default names (f0, f1, f2 ..).
* This method generates type information with fields of the given types; the fields have
* the default names (f0, f1, f2 ..).
*
* @param types types of row fields; e.g. Types.STRING, Types.INT
* @param types The types of the row fields, e.g., Types.STRING, Types.INT
*/
@varargs
def ROW(types: TypeInformation[_]*): TypeInformation[Row] = {
JTypes.ROW(types: _*)
}

/**
* Generates row type information.
* Returns type information for [[org.apache.flink.types.Row]] with fields of the given types
* and with given names.
*
* A row is a variable-length, null-aware composite type for storing multiple values in a
* deterministic field order. Every field can be null independent of the field's type.
* The type of row fields cannot be automatically inferred; therefore, it is required to provide
* type information whenever a row is used.
*
* <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all
* row instances must strictly adhere to the schema defined by the type info.
*
* A row type consists of zero or more fields with a field name and a corresponding type.
* Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, Types.INT))`.
*
* @param names names of row fields, e.g. "userid", "name"
* @param types types of row fields; e.g. Types.STRING, Types.INT
* @param fieldNames array of field names
* @param types array of field types
*/
def ROW(names: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = {
JTypes.ROW_NAMED(names, types: _*)
def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row] = {
JTypes.ROW_NAMED(fieldNames, types: _*)
}

/**
* Generates type information for an array consisting of Java primitive elements.
* Generates type information for an array consisting of Java primitive elements. The elements
* do not support null values.
*
* @param elementType type of the array elements; e.g. Types.INT
*/
Expand All @@ -93,30 +165,35 @@ object Types {
}

/**
* Generates type information for an array consisting of Java object elements.
* Generates type information for an array consisting of Java object elements. Null values for
* elements are supported.
*
* @param elementType type of the array elements; e.g. Types.STRING or Types.INT
*/
def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]] = {
ObjectArrayTypeInfo.getInfoFor(elementType)
}

/**
* Generates type information for a Java HashMap.
* Generates type information for a Java HashMap. Null values in keys are not supported. An
* entry's value can be null.
*
* @param keyType type of the keys of the map e.g. Types.STRING
* @param valueType type of the values of the map e.g. Types.STRING
*/
def MAP(keyType: TypeInformation[_], valueType: TypeInformation[_]): TypeInformation[_] = {
def MAP[K, V](
keyType: TypeInformation[K],
valueType: TypeInformation[V]): TypeInformation[util.Map[K, V]] = {
new MapTypeInfo(keyType, valueType)
}

/**
* Generates type information for a Multiset.
* Generates type information for a Multiset. A Multiset is baked by a Java HashMap and maps an
* arbitrary key to an integer value. Null values in keys are not supported.
*
* @param elementType type of the elements of the multiset e.g. Types.STRING
*/
def MULTISET(elementType: TypeInformation[_]): TypeInformation[_] = {
def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[util.Map[E, lang.Integer]] = {
new MultisetTypeInfo(elementType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package org.apache.flink.table.api.stream.table.stringexpr

import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.runtime.utils._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.utils._
import org.apache.flink.types.Row
import org.junit.Test
Expand Down
Loading

0 comments on commit e30066d

Please sign in to comment.