From ef5832d8f5867826a60f87e2fcaef912dc2950f6 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 24 May 2016 12:46:21 +0800 Subject: [PATCH] [FLINK-3941] [tableAPI] Add support for UNION to Table API. - Fix FLINK-3696 (type issues of DataSetUnion by forwarding expected types to input operators). This closes #2025 --- docs/apis/table.md | 24 +++ .../api/table/plan/logical/operators.scala | 4 +- .../plan/nodes/dataset/DataSetUnion.scala | 17 +- .../plan/rules/dataSet/DataSetUnionRule.scala | 5 +- .../org/apache/flink/api/table/table.scala | 25 ++- .../api/java/batch/table/UnionITCase.java | 186 ------------------ .../api/scala/batch/sql/UnionITCase.scala | 32 +-- .../api/scala/batch/table/UnionITCase.scala | 78 +++----- .../stream/table/UnsupportedOpsTest.scala | 9 + 9 files changed, 121 insertions(+), 259 deletions(-) delete mode 100644 flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java diff --git a/docs/apis/table.md b/docs/apis/table.md index 4e52a983500ed..f1e9cd1fe2268 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -434,6 +434,18 @@ Table result = left.join(right).where("a = d").select("a, b, e"); Union + +

Similar to a SQL UNION clause. Unions two tables with duplicate records removed. Both tables must have identical schema, i.e., field names and types.

+{% highlight java %} +Table left = tableEnv.fromDataSet(ds1, "a, b, c"); +Table right = tableEnv.fromDataSet(ds2, "a, b, c"); +Table result = left.union(right); +{% endhighlight %} + + + + + UnionAll

Similar to a SQL UNION ALL clause. Unions two tables. Both tables must have identical schema, i.e., field names and types.

{% highlight java %} @@ -545,6 +557,18 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e); Union + +

Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical schema(field names and types).

+{% highlight scala %} +val left = ds1.toTable(tableEnv, 'a, 'b, 'c); +val right = ds2.toTable(tableEnv, 'a, 'b, 'c); +val result = left.union(right); +{% endhighlight %} + + + + + UnionAll

Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical schema(field names and types).

{% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index bd299b36ab94e..6b42a7d508c63 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -236,13 +236,13 @@ case class Aggregate( } } -case class Union(left: LogicalNode, right: LogicalNode) extends BinaryNode { +case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode { override def output: Seq[Attribute] = left.output override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { left.construct(relBuilder) right.construct(relBuilder) - relBuilder.union(true) + relBuilder.union(all) } override def validate(tableEnv: TableEnvironment): LogicalNode = { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala index b6f6a19ccd5f7..78f64a464a848 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala @@ -55,7 +55,7 @@ class DataSetUnion( } override def toString: String = { - "Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))" + s"Union(union: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))" } override def explainTerms(pw: RelWriter): RelWriter = { @@ -76,8 +76,19 @@ class DataSetUnion( tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + var leftDataSet: DataSet[Any] = null + var rightDataSet: DataSet[Any] = null + + expectedType match { + case None => + leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + rightDataSet = + right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType)) + case _ => + leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType) + rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType) + } + leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]] } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala index 7809d6d1ac1ec..ea35637efe4dc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala @@ -22,6 +22,7 @@ import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTrait import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalUnion +import org.apache.calcite.rel.rules.UnionToDistinctRule import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion} class DataSetUnionRule @@ -33,7 +34,9 @@ class DataSetUnionRule { /** - * Only translate UNION ALL + * Only translate UNION ALL. + * Note: A distinct Union are translated into + * an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]] */ override def matches(call: RelOptRuleCall): Boolean = { val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion] diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 1e558c5448c2d..394b833dfc493 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -259,6 +259,29 @@ class Table( Join(this.logicalPlan, right.logicalPlan, JoinType.INNER, None).validate(tableEnv)) } + /** + * Union two [[Table]]s with duplicate records removed. + * Similar to an SQL UNION. The fields of the two union operations must fully overlap. + * + * Note: Both tables must be bound to the same [[TableEnvironment]]. + * + * Example: + * + * {{{ + * left.union(right) + * }}} + */ + def union(right: Table): Table = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new TableException(s"Union on stream tables is currently not supported.") + } + // check that right table belongs to the same TableEnvironment + if (right.tableEnv != this.tableEnv) { + throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") + } + new Table(tableEnv, Union(logicalPlan, right.logicalPlan, false).validate(tableEnv)) + } + /** * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations * must fully overlap. @@ -276,7 +299,7 @@ class Table( if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") } - new Table(tableEnv, Union(logicalPlan, right.logicalPlan).validate(tableEnv)) + new Table(tableEnv, Union(logicalPlan, right.logicalPlan, true).validate(tableEnv)) } /** diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java deleted file mode 100644 index 853cd7f5403ce..0000000000000 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.batch.table; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.BatchTableEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.table.Row; -import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.TableEnvironment; -import org.apache.flink.api.table.ValidationException; -import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class UnionITCase extends MultipleProgramsTestBase { - - public UnionITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testUnion() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, c"); - - Table selected = in1.unionAll(in2).select("c"); - - DataSet ds = tableEnv.toDataSet(selected, Row.class); - List results = ds.collect(); - String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"; - compareResultAsText(results, expected); - } - - @Test - public void testUnionWithFilter() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c"); - - Table selected = in1.unionAll(in2).where("b < 2").select("c"); - - DataSet ds = tableEnv.toDataSet(selected, Row.class); - List results = ds.collect(); - String expected = "Hi\n" + "Hallo\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testUnionIncompatibleNumberOfFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h"); - - // Must fail. Number of fields of union inputs do not match - in1.unionAll(in2); - } - - @Test(expected = ValidationException.class) - public void testUnionIncompatibleFieldsName() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, d"); - - // Must fail. Field names of union inputs do not match - in1.unionAll(in2); - } - - @Test(expected = ValidationException.class) - public void testUnionIncompatibleFieldTypes() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, e").select("a, b, c"); - - // Must fail. Field types of union inputs do not match - in1.unionAll(in2); - } - - @Test - public void testUnionWithAggregation() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c"); - - Table selected = in1.unionAll(in2).select("c.count"); - - DataSet ds = tableEnv.toDataSet(selected, Row.class); - List results = ds.collect(); - String expected = "18"; - compareResultAsText(results, expected); - } - - @Test - public void testUnionWithJoin() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); - DataSet> ds3 = CollectionDataSets.getSmall5TupleDataSet(env); - - Table in1 = tableEnv.fromDataSet(ds1, "a, b, c"); - Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, e").select("a, b, c"); - Table in3 = tableEnv.fromDataSet(ds3, "a2, b2, d2, c2, e2").select("a2, b2, c2"); - - Table joinDs = in1.unionAll(in2).join(in3).where("a === a2").select("c, c2"); - DataSet ds = tableEnv.toDataSet(joinDs, Row.class); - List results = ds.collect(); - - String expected = "Hi,Hallo\n" + "Hallo,Hallo\n" + - "Hello,Hallo Welt\n" + "Hello,Hallo Welt wie\n" + - "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" + - "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n"; - compareResultAsText(results, expected); - } - - @Test(expected = ValidationException.class) - public void testUnionTablesFromDifferentEnvs() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env); - BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env); - - DataSet> ds1 = CollectionDataSets.getSmall3TupleDataSet(env); - DataSet> ds2 = CollectionDataSets.getSmall3TupleDataSet(env); - - Table in1 = tEnv1.fromDataSet(ds1, "a, b, c"); - Table in2 = tEnv2.fromDataSet(ds2, "a, b, c"); - - // Must fail. Tables are bound to different TableEnvironments. - in1.unionAll(in2); - } -} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala index a42d3283835d9..527eac75c592d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala @@ -39,7 +39,7 @@ class UnionITCase( extends TableProgramsTestBase(mode, configMode) { @Test - def testUnion(): Unit = { + def testUnionAll(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -58,16 +58,31 @@ class UnionITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - //TODO: activate for EFFICIENT mode @Test - def testUnionWithFilter(): Unit = { + def testUnion(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - if (tEnv.getConfig.getEfficientTypeUsage) { - return - } + val sqlQuery = "SELECT c FROM t1 UNION (SELECT c FROM t2)" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testUnionWithFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) val sqlQuery = "SELECT c FROM (" + "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" + @@ -85,17 +100,12 @@ class UnionITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - //TODO: activate for EFFICIENT mode @Test def testUnionWithAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - if (tEnv.getConfig.getEfficientTypeUsage) { - return - } - val sqlQuery = "SELECT count(c) FROM (" + "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala index 29427a5d29caa..f472341284bc9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala @@ -39,7 +39,7 @@ class UnionITCase( extends TableProgramsTestBase(mode, configMode) { @Test - def testUnion(): Unit = { + def testUnionAll(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -54,7 +54,22 @@ class UnionITCase( } @Test - def testTernaryUnion(): Unit = { + def testUnion(): Unit = { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + + val unionDs = ds1.union(ds2).select('c) + + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTernaryUnionAll(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) @@ -72,21 +87,18 @@ class UnionITCase( } @Test - def testUnionWithFilter(): Unit = { + def testTernaryUnion(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - if (tEnv.getConfig.getEfficientTypeUsage) { - return - } - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) + val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c) + val unionDs = ds1.union(ds2).union(ds3).select('c) - val results = joinDs.toDataSet[Row].collect() - val expected = "Hi\n" + "Hallo\n" + val results = unionDs.toDataSet[Row].collect() + val expected = "Hi\n" + "Hello\n" + "Hello world\n" TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -115,50 +127,6 @@ class UnionITCase( ds1.unionAll(ds2) } - @Test - def testUnionWithAggregation(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - if (tEnv.getConfig.getEfficientTypeUsage) { - return - } - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) - - val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count) - - val results = unionDs.toDataSet[Row].collect() - val expected = "18" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testUnionWithJoin(): Unit = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - if (tEnv.getConfig.getEfficientTypeUsage) { - return - } - - val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv,'a, 'b, 'd, 'c, 'e) - val ds3 = CollectionDataSets.getSmall5TupleDataSet(env).toTable(tEnv, 'a2, 'b2, 'd2, 'c2, 'e2) - - val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)) - .join(ds3.select('a2, 'b2, 'c2)) - .where('a ==='a2).select('c, 'c2) - - val results = joinDs.toDataSet[Row].collect() - val expected = "Hi,Hallo\n" + "Hallo,Hallo\n" + - "Hello,Hallo Welt\n" + "Hello,Hallo Welt wie\n" + - "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" + - "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n" - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - @Test(expected = classOf[ValidationException]) def testUnionTablesFromDifferentEnvs(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala index a382447bcb9d0..92de6f17fa35d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala @@ -64,4 +64,13 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) t1.join(t2) } + + @Test(expected = classOf[TableException]) + def testUnion(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + t1.union(t2) + } }