Skip to content

Commit

Permalink
[FLINK-3941] [tableAPI] Add support for UNION to Table API.
Browse files Browse the repository at this point in the history
- Fix FLINK-3696 (type issues of DataSetUnion by forwarding expected types to input operators).

This closes apache#2025
  • Loading branch information
yjshen authored and fhueske committed May 26, 2016
1 parent 5784f39 commit ef5832d
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 259 deletions.
24 changes: 24 additions & 0 deletions docs/apis/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,18 @@ Table result = left.join(right).where("a = d").select("a, b, e");

<tr>
<td><strong>Union</strong></td>
<td>
<p>Similar to a SQL UNION clause. Unions two tables with duplicate records removed. Both tables must have identical schema, i.e., field names and types.</p>
{% highlight java %}
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>UnionAll</strong></td>
<td>
<p>Similar to a SQL UNION ALL clause. Unions two tables. Both tables must have identical schema, i.e., field names and types.</p>
{% highlight java %}
Expand Down Expand Up @@ -545,6 +557,18 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e);

<tr>
<td><strong>Union</strong></td>
<td>
<p>Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical schema(field names and types).</p>
{% highlight scala %}
val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
val result = left.union(right);
{% endhighlight %}
</td>
</tr>

<tr>
<td><strong>UnionAll</strong></td>
<td>
<p>Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical schema(field names and types).</p>
{% highlight scala %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,13 @@ case class Aggregate(
}
}

case class Union(left: LogicalNode, right: LogicalNode) extends BinaryNode {
case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
left.construct(relBuilder)
right.construct(relBuilder)
relBuilder.union(true)
relBuilder.union(all)
}

override def validate(tableEnv: TableEnvironment): LogicalNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DataSetUnion(
}

override def toString: String = {
"Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
s"Union(union: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
}

override def explainTerms(pw: RelWriter): RelWriter = {
Expand All @@ -76,8 +76,19 @@ class DataSetUnion(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
var leftDataSet: DataSet[Any] = null
var rightDataSet: DataSet[Any] = null

expectedType match {
case None =>
leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
rightDataSet =
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
case _ =>
leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
}

leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTrait
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.LogicalUnion
import org.apache.calcite.rel.rules.UnionToDistinctRule
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion}

class DataSetUnionRule
Expand All @@ -33,7 +34,9 @@ class DataSetUnionRule
{

/**
* Only translate UNION ALL
* Only translate UNION ALL.
* Note: A distinct Union are translated into
* an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]]
*/
override def matches(call: RelOptRuleCall): Boolean = {
val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,29 @@ class Table(
Join(this.logicalPlan, right.logicalPlan, JoinType.INNER, None).validate(tableEnv))
}

/**
* Union two [[Table]]s with duplicate records removed.
* Similar to an SQL UNION. The fields of the two union operations must fully overlap.
*
* Note: Both tables must be bound to the same [[TableEnvironment]].
*
* Example:
*
* {{{
* left.union(right)
* }}}
*/
def union(right: Table): Table = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Union on stream tables is currently not supported.")
}
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
}
new Table(tableEnv, Union(logicalPlan, right.logicalPlan, false).validate(tableEnv))
}

/**
* Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
* must fully overlap.
Expand All @@ -276,7 +299,7 @@ class Table(
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
}
new Table(tableEnv, Union(logicalPlan, right.logicalPlan).validate(tableEnv))
new Table(tableEnv, Union(logicalPlan, right.logicalPlan, true).validate(tableEnv))
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class UnionITCase(
extends TableProgramsTestBase(mode, configMode) {

@Test
def testUnion(): Unit = {
def testUnionAll(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
Expand All @@ -58,16 +58,31 @@ class UnionITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

//TODO: activate for EFFICIENT mode
@Test
def testUnionWithFilter(): Unit = {
def testUnion(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

if (tEnv.getConfig.getEfficientTypeUsage) {
return
}
val sqlQuery = "SELECT c FROM t1 UNION (SELECT c FROM t2)"

val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)

val result = tEnv.sql(sqlQuery)

val expected = "Hi\n" + "Hello\n" + "Hello world\n"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

@Test
def testUnionWithFilter(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT c FROM (" +
"SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
Expand All @@ -85,17 +100,12 @@ class UnionITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}

//TODO: activate for EFFICIENT mode
@Test
def testUnionWithAggregation(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

if (tEnv.getConfig.getEfficientTypeUsage) {
return
}

val sqlQuery = "SELECT count(c) FROM (" +
"SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"

Expand Down
Loading

0 comments on commit ef5832d

Please sign in to comment.