Skip to content

Commit

Permalink
[FLINK-4183] [table] Move checking for StreamTableEnvironment into va…
Browse files Browse the repository at this point in the history
…lidation layer

This closes apache#2221.
  • Loading branch information
twalthr committed Jul 19, 2016
1 parent dd53831 commit e85f787
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ case class Distinct(child: LogicalNode) extends UnaryNode {

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Distinct on stream tables is currently not supported.")
failValidation(s"Distinct on stream tables is currently not supported.")
}
this
}
Expand All @@ -144,7 +144,7 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Distinct on stream tables is currently not supported.")
failValidation(s"Distinct on stream tables is currently not supported.")
}
super.validate(tableEnv)
}
Expand Down Expand Up @@ -196,7 +196,7 @@ case class Aggregate(

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Aggregate on stream tables is currently not supported.")
failValidation(s"Aggregate on stream tables is currently not supported.")
}

val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
Expand Down Expand Up @@ -277,6 +277,10 @@ case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends Bi
}

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) {
failValidation(s"Union on stream tables is currently not supported.")
}

val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
if (left.output.length != right.output.length) {
failValidation(s"Union two tables of different column sizes:" +
Expand Down Expand Up @@ -304,6 +308,10 @@ case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extend
}

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
failValidation(s"Intersect on stream tables is currently not supported.")
}

val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]
if (left.output.length != right.output.length) {
failValidation(s"Intersect two tables of different column sizes:" +
Expand Down Expand Up @@ -392,7 +400,7 @@ case class Join(

override def validate(tableEnv: TableEnvironment): LogicalNode = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Join on stream tables is currently not supported.")
failValidation(s"Join on stream tables is currently not supported.")
}

val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class Table(
*/
def groupBy(fields: Expression*): GroupedTable = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Group by on stream tables is currently not supported.")
throw new ValidationException(s"Group by on stream tables is currently not supported.")
}
new GroupedTable(this, fields)
}
Expand Down Expand Up @@ -392,7 +392,6 @@ class Table(
}

private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {

// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
Expand Down Expand Up @@ -464,14 +463,11 @@ class Table(
* }}}
*/
def union(right: Table): Table = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Union on stream tables is currently not supported.")
}
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
}
new Table(tableEnv, Union(logicalPlan, right.logicalPlan, false).validate(tableEnv))
new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
}

/**
Expand All @@ -491,7 +487,7 @@ class Table(
if (right.tableEnv != this.tableEnv) {
throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
}
new Table(tableEnv, Union(logicalPlan, right.logicalPlan, true).validate(tableEnv))
new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
}

/**
Expand All @@ -509,9 +505,6 @@ class Table(
* }}}
*/
def intersect(right: Table): Table = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Intersect on stream tables is currently not supported.")
}
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException(
Expand All @@ -535,9 +528,6 @@ class Table(
* }}}
*/
def intersectAll(right: Table): Table = {
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
throw new TableException(s"Intersect on stream tables is currently not supported.")
}
// check that right table belongs to the same TableEnvironment
if (right.tableEnv != this.tableEnv) {
throw new ValidationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,43 @@ package org.apache.flink.api.scala.stream.table

import org.apache.flink.api.scala.stream.utils.StreamTestData
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.{ValidationException, TableEnvironment, TableException}
import org.apache.flink.api.table.{TableEnvironment, ValidationException}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.junit.Test

class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {

@Test(expected = classOf[TableException])
@Test(expected = classOf[ValidationException])
def testSelectWithAggregation(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
}

@Test(expected = classOf[TableException])
@Test(expected = classOf[ValidationException])
def testGroupBy(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
.groupBy('_1)
}

@Test(expected = classOf[TableException])
@Test(expected = classOf[ValidationException])
def testDistinct(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
}

@Test(expected = classOf[TableException])
@Test(expected = classOf[ValidationException])
def testSort(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
}

@Test(expected = classOf[TableException])
@Test(expected = classOf[ValidationException])
def testJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
Expand All @@ -65,7 +65,7 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
t1.join(t2)
}

@Test(expected = classOf[TableException])
@Test(expected = classOf[ValidationException])
def testUnion(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
Expand All @@ -74,6 +74,24 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
t1.union(t2)
}

@Test(expected = classOf[ValidationException])
def testIntersect(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
t1.intersect(t2)
}

@Test(expected = classOf[ValidationException])
def testIntersectAll(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
t1.intersectAll(t2)
}

@Test(expected = classOf[ValidationException])
def testMinus(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand All @@ -82,4 +100,13 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
t1.minus(t2)
}

@Test(expected = classOf[ValidationException])
def testMinusAll(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
t1.minusAll(t2)
}
}

0 comments on commit e85f787

Please sign in to comment.