Skip to content

Commit

Permalink
[FLINK-4554] [table] Add support for array types
Browse files Browse the repository at this point in the history
This closes apache#2919.
  • Loading branch information
twalthr committed Dec 7, 2016
1 parent 13150a4 commit 4414008
Show file tree
Hide file tree
Showing 18 changed files with 1,122 additions and 39 deletions.
158 changes: 154 additions & 4 deletions docs/dev/table_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,14 @@ The Table API is built on top of Flink's DataSet and DataStream API. Internally,
| `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` |
| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` |

Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and arrays can be fields of a row. Generic types and arrays are treated as a black box within Table API and SQL yet. Composite types, however, are fully supported types where fields of a composite type can be accessed using the `.get()` operator in Table API and dot operator (e.g. `MyTable.pojoColumn.myField`) in SQL. Composite types can also be flattened using `.flatten()` in Table API or `MyTable.pojoColumn.*` in SQL.

Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and array types (object or primitive arrays) can be fields of a row.

Generic types are treated as a black box within Table API and SQL yet.

Composite types, however, are fully supported types where fields of a composite type can be accessed using the `.get()` operator in Table API and dot operator (e.g. `MyTable.pojoColumn.myField`) in SQL. Composite types can also be flattened using `.flatten()` in Table API or `MyTable.pojoColumn.*` in SQL.

Array types can be accessed using the `myArray.at(1)` operator in Table API and `myArray[1]` operator in SQL. Array literals can be created using `array(1, 2, 3)` in Table API and `ARRAY[1, 2, 3]` in SQL.

{% top %}

Expand Down Expand Up @@ -2038,6 +2045,50 @@ COMPOSITE.get(INT)
</td>
</tr>

<tr>
<td>
{% highlight java %}
ARRAY.at(INT)
{% endhighlight %}
</td>
<td>
<p>Returns the element at a particular position in an array. The index starts at 1.</p>
</td>
</tr>

<tr>
<td>
{% highlight java %}
array(ANY [, ANY ]*)
{% endhighlight %}
</td>
<td>
<p>Creates an array from a list of values. The array will be an array of objects (not primitives).</p>
</td>
</tr>

<tr>
<td>
{% highlight java %}
ARRAY.cardinality()
{% endhighlight %}
</td>
<td>
<p>Returns the number of elements of an array.</p>
</td>
</tr>

<tr>
<td>
{% highlight scala %}
ARRAY.element()
{% endhighlight %}
</td>
<td>
<p>Returns the sole element of an array with a single element. Returns <code>null</code> if the array is empty. Throws an exception if the array has more than one element.</p>
</td>
</tr>

</tbody>
</table>

Expand Down Expand Up @@ -2599,6 +2650,50 @@ COMPOSITE.get(INT)
</td>
</tr>

<tr>
<td>
{% highlight scala %}
ARRAY.at(INT)
{% endhighlight %}
</td>
<td>
<p>Returns the element at a particular position in an array. The index starts at 1.</p>
</td>
</tr>

<tr>
<td>
{% highlight scala %}
array(ANY [, ANY ]*)
{% endhighlight %}
</td>
<td>
<p>Creates an array from a list of values. The array will be an array of objects (not primitives).</p>
</td>
</tr>

<tr>
<td>
{% highlight scala %}
ARRAY.cardinality()
{% endhighlight %}
</td>
<td>
<p>Returns the number of elements of an array.</p>
</td>
</tr>

<tr>
<td>
{% highlight scala %}
ARRAY.element()
{% endhighlight %}
</td>
<td>
<p>Returns the sole element of an array with a single element. Returns <code>null</code> if the array is empty. Throws an exception if the array has more than one element.</p>
</td>
</tr>

</tbody>
</table>
</div>
Expand Down Expand Up @@ -3368,8 +3463,6 @@ CAST(value AS type)
</tbody>
</table>


<!-- Disabled temporarily in favor of composite type support
<table class="table table-bordered">
<thead>
<tr>
Expand All @@ -3379,6 +3472,7 @@ CAST(value AS type)
</thead>

<tbody>
<!-- Disabled temporarily in favor of composite type support
<tr>
<td>
{% highlight text %}
Expand All @@ -3400,9 +3494,32 @@ ROW (value [, value]* )
<p>Creates a row from a list of values.</p>
</td>
</tr>
-->

<tr>
<td>
{% highlight text %}
array ‘[’ index ‘]
{% endhighlight %}
</td>
<td>
<p>Returns the element at a particular position in an array. The index starts at 1.</p>
</td>
</tr>

<tr>
<td>
{% highlight text %}
ARRAY ‘[’ value [, value ]*]
{% endhighlight %}
</td>
<td>
<p>Creates an array from a list of values.</p>
</td>
</tr>

</tbody>
</table>
-->

<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -3657,6 +3774,39 @@ tableName.compositeType.*
</tbody>
</table>

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 40%">Array functions</th>
<th class="text-center">Description</th>
</tr>
</thead>

<tbody>
<tr>
<td>
{% highlight text %}
CARDINALITY(ARRAY)
{% endhighlight %}
</td>
<td>
<p>Returns the number of elements of an array.</p>
</td>
</tr>

<tr>
<td>
{% highlight text %}
ELEMENT(ARRAY)
{% endhighlight %}
</td>
<td>
<p>Returns the sole element of an array with a single element. Returns <code>null</code> if the array is empty. Throws an exception if the array has more than one element.</p>
</td>
</tr>
</tbody>
</table>

</div>
</div>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.sql.{Date, Time, Timestamp}

import org.apache.calcite.avatica.util.DateTimeUtils._
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval, toRowInterval}
import org.apache.flink.api.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
import org.apache.flink.api.table.expressions._
import java.math.{BigDecimal => JBigDecimal}

import scala.language.implicitConversions

Expand Down Expand Up @@ -461,6 +462,29 @@ trait ImplicitExpressionOperations {
* into a flat representation where every subtype is a separate field.
*/
def flatten() = Flattening(expr)

/**
* Accesses the element of an array based on an index (starting at 1).
*
* @param index position of the element (starting at 1)
* @return value of the element
*/
def at(index: Expression) = ArrayElementAt(expr, index)

/**
* Returns the number of elements of an array.
*
* @return number of elements
*/
def cardinality() = ArrayCardinality(expr)

/**
* Returns the sole element of an array with a single element. Returns null if the array is
* empty. Throws an exception if the array has more than one element.
*
* @return the first and only element of an array with a single element
*/
def element() = ArrayElement(expr)
}

/**
Expand Down Expand Up @@ -540,18 +564,24 @@ trait ImplicitExpressionConversions {
implicit def float2Literal(d: Float): Expression = Literal(d)
implicit def string2Literal(str: String): Expression = Literal(str)
implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
implicit def javaDec2Literal(javaDec: java.math.BigDecimal): Expression = Literal(javaDec)
implicit def scalaDec2Literal(scalaDec: scala.math.BigDecimal): Expression =
implicit def javaDec2Literal(javaDec: JBigDecimal): Expression = Literal(javaDec)
implicit def scalaDec2Literal(scalaDec: BigDecimal): Expression =
Literal(scalaDec.bigDecimal)
implicit def sqlDate2Literal(sqlDate: Date): Expression = Literal(sqlDate)
implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression = Literal(sqlTimestamp)
implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression =
Literal(sqlTimestamp)
implicit def array2ArrayConstructor(array: Array[_]): Expression = convertArray(array)
}

// ------------------------------------------------------------------------------------------------
// Expressions with no parameters
// ------------------------------------------------------------------------------------------------

// we disable the object checker here as it checks for capital letters of objects
// but we want that objects look like functions in certain cases e.g. array(1, 2, 3)
// scalastyle:off object.name

/**
* Returns the current SQL date in UTC time zone.
*/
Expand Down Expand Up @@ -645,5 +675,17 @@ object temporalOverlaps {
}
}

/**
* Creates an array of literals. The array will be an array of objects (not primitives).
*/
object array {

/**
* Creates an array of literals. The array will be an array of objects (not primitives).
*/
def apply(head: Expression, tail: Expression*): Expression = {
ArrayConstructor(head +: tail.toSeq)
}
}

// scalastyle:on object.name
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
import org.apache.flink.api.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
import org.apache.flink.api.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType}
import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple

Expand Down Expand Up @@ -102,11 +103,22 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
}

override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
new ArrayRelDataType(
ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
elementType,
true)

private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
case ct: CompositeType[_] =>
new CompositeRelDataType(ct, this)

// TODO add specific RelDataTypes for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo
case pa: PrimitiveArrayTypeInfo[_] =>
new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)

case oa: ObjectArrayTypeInfo[_, _] =>
new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)

case ti: TypeInformation[_] =>
new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])

Expand Down Expand Up @@ -190,6 +202,10 @@ object FlinkTypeFactory {
// ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder
case ROW | CURSOR => new NothingTypeInfo

case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
arrayRelDataType.typeInfo

case _@t =>
throw TableException(s"Type is not supported: $t")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ object CodeGenUtils {
def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] =
Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]]


// ----------------------------------------------------------------------------------------------

def requireNumeric(genExpr: GeneratedExpression) =
Expand Down Expand Up @@ -189,6 +188,16 @@ object CodeGenUtils {
throw new CodeGenException("Interval expression type expected.")
}

def requireArray(genExpr: GeneratedExpression) =
if (!TypeCheckUtils.isArray(genExpr.resultType)) {
throw new CodeGenException("Array expression type expected.")
}

def requireInteger(genExpr: GeneratedExpression) =
if (!TypeCheckUtils.isInteger(genExpr.resultType)) {
throw new CodeGenException("Integer expression type expected.")
}

// ----------------------------------------------------------------------------------------------

def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType)
Expand Down
Loading

0 comments on commit 4414008

Please sign in to comment.