Skip to content

Commit

Permalink
[FLINK-7491] [table] Add MultiSet type and COLLECT aggregation functi…
Browse files Browse the repository at this point in the history
…on to SQL.

This closes apache#4585.
  • Loading branch information
Shuyi Chen authored and fhueske committed Oct 10, 2017
1 parent 4047be4 commit dccdba1
Show file tree
Hide file tree
Showing 15 changed files with 677 additions and 10 deletions.
13 changes: 12 additions & 1 deletion docs/dev/table/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internal
| `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` |
| `Types.OBJECT_ARRAY` | `ARRAY` | e.g. `java.lang.Byte[]`|
| `Types.MAP` | `MAP` | `java.util.HashMap` |
| `Types.MULTISET` | `MULTISET` | e.g. `java.util.HashMap<String, Integer>` for a multiset of `String` |


Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and array types (object or primitive arrays) can be fields of a row.
Expand Down Expand Up @@ -2164,6 +2165,17 @@ VAR_SAMP(value)
<p>Returns the sample variance (square of the sample standard deviation) of the numeric field across all input values.</p>
</td>
</tr>

<tr>
<td>
{% highlight text %}
COLLECT(value)
{% endhighlight %}
</td>
<td>
<p>Returns a multiset of the <i>value</i>s. null input <i>value</i> will be ignored. Return an empty multiset if only null values are added. </p>
</td>
</tr>
</tbody>
</table>

Expand Down Expand Up @@ -2283,7 +2295,6 @@ The following functions are not supported yet:

- Binary string operators and functions
- System functions
- Collection functions
- Distinct aggregate functions like COUNT DISTINCT

{% top %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public int getArity() {

@Override
public int getTotalFields() {
return 2;
return 1;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link TypeInformation} for the Multiset types of the Java API.
*
* @param <T> The type of the elements in the Multiset.
*/
@PublicEvolving
public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {

private static final long serialVersionUID = 1L;

public MultisetTypeInfo(Class<T> elementTypeClass) {
super(elementTypeClass, Integer.class);
}

public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
}

// ------------------------------------------------------------------------
// MultisetTypeInfo specific properties
// ------------------------------------------------------------------------

/**
* Gets the type information for the elements contained in the Multiset
*/
public TypeInformation<T> getElementTypeInfo() {
return getKeyTypeInfo();
}

@Override
public String toString() {
return "Multiset<" + getKeyTypeInfo() + '>';
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
else if (obj instanceof MultisetTypeInfo) {
final MultisetTypeInfo<?> other = (MultisetTypeInfo<?>) obj;
return other.canEqual(this) && getKeyTypeInfo().equals(other.getKeyTypeInfo());
} else {
return false;
}
}

@Override
public int hashCode() {
return 31 * getKeyTypeInfo().hashCode() + 1;
}

@Override
public boolean canEqual(Object obj) {
return obj != null && obj.getClass() == getClass();
}

@SuppressWarnings("unchecked")
@PublicEvolving
public static <C> MultisetTypeInfo<C> getInfoFor(TypeInformation<C> componentInfo) {
checkNotNull(componentInfo);

return new MultisetTypeInfo<>(componentInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeInformationTestBase;

/**
* Test for {@link MultisetTypeInfo}.
*/
public class MultisetTypeInfoTest extends TypeInformationTestBase<MultisetTypeInfo<?>> {

@Override
protected MultisetTypeInfo<?>[] getTestData() {
return new MultisetTypeInfo<?>[] {
new MultisetTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO),
new MultisetTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO),
new MultisetTypeInfo<>(Long.class)
};
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.table.api

import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.types.Row

Expand Down Expand Up @@ -110,4 +110,13 @@ object Types {
def MAP(keyType: TypeInformation[_], valueType: TypeInformation[_]): TypeInformation[_] = {
new MapTypeInfo(keyType, valueType)
}

/**
* Generates type information for a Multiset.
*
* @param elementType type of the elements of the multiset e.g. Types.STRING
*/
def MULTISET(elementType: TypeInformation[_]): TypeInformation[_] = {
new MultisetTypeInfo(elementType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
import org.apache.flink.table.plan.schema._
Expand Down Expand Up @@ -156,6 +156,13 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
isNullable)

case mts: MultisetTypeInfo[_] =>
new MultisetRelDataType(
mts,
createTypeFromTypeInfo(mts.getElementTypeInfo, isNullable = true),
isNullable
)

case ti: TypeInformation[_] =>
new GenericRelDataType(
ti,
Expand Down Expand Up @@ -213,6 +220,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
canonize(relType)
}

override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
val relType = new MultisetRelDataType(
MultisetTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
elementType,
isNullable = false)
canonize(relType)
}

override def createTypeWithNullability(
relDataType: RelDataType,
isNullable: Boolean): RelDataType = {
Expand All @@ -234,6 +249,9 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
case map: MapRelDataType =>
new MapRelDataType(map.typeInfo, map.keyType, map.valueType, isNullable)

case multiSet: MultisetRelDataType =>
new MultisetRelDataType(multiSet.typeInfo, multiSet.getComponentType, isNullable)

case generic: GenericRelDataType =>
new GenericRelDataType(generic.typeInfo, isNullable, typeSystem)

Expand Down Expand Up @@ -403,6 +421,10 @@ object FlinkTypeFactory {
val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
mapRelDataType.typeInfo

case MULTISET if relDataType.isInstanceOf[MultisetRelDataType] =>
val multisetRelDataType = relDataType.asInstanceOf[MultisetRelDataType]
multisetRelDataType.typeInfo

case _@t =>
throw TableException(s"Type is not supported: $t")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class ExpressionReducer(config: TableConfig)
case (SqlTypeName.ANY, _) |
(SqlTypeName.ROW, _) |
(SqlTypeName.ARRAY, _) |
(SqlTypeName.MAP, _) => None
(SqlTypeName.MAP, _) |
(SqlTypeName.MULTISET, _) => None

case (_, e) => Some(e)
}
Expand Down Expand Up @@ -112,7 +113,11 @@ class ExpressionReducer(config: TableConfig)
val unreduced = constExprs.get(i)
unreduced.getType.getSqlTypeName match {
// we insert the original expression for object literals
case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY | SqlTypeName.MAP =>
case SqlTypeName.ANY |
SqlTypeName.ROW |
SqlTypeName.ARRAY |
SqlTypeName.MAP |
SqlTypeName.MULTISET =>
reducedValues.add(unreduced)
case _ =>
val reducedValue = reduced.getField(reducedIdx)
Expand Down
Loading

0 comments on commit dccdba1

Please sign in to comment.